Skip to content
Snippets Groups Projects
gstlal_inspiral_marginalize_likelihoods_online 11.6 KiB
Newer Older
# Copyright (C) 2012,2014  Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
### This program runs gstlal_inspiral_marginalize_likelihood in a while True
### loop; See gstlal_inspiral_marginalize_likelihoods_online for more details
###
###
### This program is not meant to be executed standalone by a user. It should be
### part of a DAG managing a running gstlal_inspiral online analysis.
###
### This program takes two or more arguments;
###
### 1. The path of the output file name
### 2. One or more paths to text files each containing a single line giving
###    the root URL of a web server from which to retrieve event parameter
###    distribution data (e.g., "http://node001.ligo.caltech.edu")
###
### This program queries each running gstlal_inspiral job via the URL,
### computes PDFs of the likelihood ratio ranking statistics from the
### parameter distribution data, then marginalizes the ranking statistic PDFs
### across jobs and writes the result to the given output file.
###
### It continues to do that in an infinite loop with a 10 minute pause on
### each iteration.  Files are not overwritten directly, but rather via a
### temporary file and mv operation to ensure that no files are corrupted in
### a POSIX file environment.
###
### Review status
### -------------
###
### +---------------------------------------------+---------------------------------------------+------------+
### | Names                                       | Hash                                        | Date       |
### +=============================================+=============================================+============+
### | Florent, Jolien, Kipp, Chad                 | 0e96523b8846e5a4597ba3477c8462443470cd94    | 2015-05-15 |
### +---------------------------------------------+---------------------------------------------+------------+
###
from optparse import OptionParser
import sys
import time
import logging
import tempfile
import os
import shutil
import itertools
Rebecca Ewing's avatar
Rebecca Ewing committed
from gstlal import inspiral
from gstlal import events
from gstlal.datafind import DataCache, DataType
from collections import deque
from urllib.error import URLError, HTTPError
from ligo import lw
from ligo.lw import ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
from gstlal import far
def parse_command_line():
	parser = OptionParser()
	parser.add_option("--output-path", metavar = "path", help = "Set the path where the output PDFs are stored. Optional")
	parser.add_option("--registry", metavar = "filename", action = "append", help = "")
	parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).")
Rebecca Ewing's avatar
Rebecca Ewing committed
	parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
	parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
	parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist")
	parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
	options, filenames = parser.parse_args()
	if options.registry is None:
		raise ValueError("must provide at least one registry file.")
	return options

def calc_rank_pdfs(url, samples, num_cores, verbose = False):
	"""
	create a Ranking Stat PDF from a url
	tries = 0
	failed = 1
	while tries < 3:
		try:
			rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
			failed = 0
			break
		except (URLError, HTTPError) as e:
			logging.warning(f'Caught error while running calc rank pdfs: {e}.')
			tries += 1

	if not failed:
		lr_rankingstat = rankingstat.copy()
		lr_rankingstat.finish()
		rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)

		return 1, rankingstatpdf
	else:
		return 0, None


def url_from_registry(registry, path):
	"""
	parse a registry file and return
	the url to given path 
	"""
	with open(registry, "r") as f:
		server = f.read()
	server = server.replace("\n", "")
	logging.debug(f'server: {server}')

	return os.path.join(server, path)


def main():
	options = parse_command_line()

	# set up logging
	log_level = logging.INFO if options.verbose else logging.DEBUG
	logging.basicConfig(format = '%(asctime)s | marginalize_likelihoods: %(levelname)s : %(message)s')
	logging.getLogger().setLevel(log_level)

	registries = options.registry

	failed = deque(maxlen = len(registries))

	# options for generating ranking stat pdfs
	# get 10 million samples
	ranking_stat_samples = int(10000000 / len(registries))

	#
	# set up the output paths
	#

	marg_pdf = DataCache.find(DataType.DIST_STAT_PDFS, root = options.output_path)
	pdfs = DataCache.find(DataType.DIST_STAT_PDFS, svd_bins = "*", root = options.output_path)
	if len(marg_pdf) == 1 and len(pdfs) == len(registries):
		files_exist = True
	elif len(marg_pdf) == 0 and len(pdfs) == 0:
		files_exist = False
	elif len(marg_pdf) == 1 and len(pdfs) != len(registries): 
		raise ValueError(f"Number of registry files provided ({len(registries)}) does not match number of DIST_STAT_PDF files found ({len(pdfs)})")
	else:
		raise ValueError("Could not find marg DIST_STAT_PDF file")

	svd_bins = [reg[:4] for reg in registries]
	if files_exist:
		assert set(pdfs.groupby('svd_bin').keys()) == set(svd_bins), "svd bins of registry files are not the same as svd bins of found PDFs"
	else:
		marg_pdf = DataCache.generate(DataType.DIST_STAT_PDFS, options.ifo, root = options.output_path)
		pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, options.ifo, svd_bins = svd_bins, root = options.output_path)

	pdfs = pdfs.groupby('svd_bin')

	#
	# paths to data objects on each job's web management interface
	#

	likelihood_path="rankingstat.xml"
	zerolag_counts_path="zerolag_rankingstatpdf.xml"

Rebecca Ewing's avatar
Rebecca Ewing committed
	#
	# send heartbeat messages for the purpose of monitoring
	#
	if options.output_kafka_server:
		kafka_processor = events.EventProcessor(
			kafka_server=options.output_kafka_server,
			tag=options.tag,
			send_heartbeats = True,
			heartbeat_cadence = 60.,
			heartbeat_topic = f"gstlal.{options.tag}.marginalize_likelihoods_online_heartbeat"
		)
	else:
		kafka_processor = None

	#
	# pause on start up to make sure all inspiral jobs are running
	#

	sleep=600
	logging.info(f"... sleeping for {sleep} seconds ...")
	time.sleep(sleep)

Rebecca Ewing's avatar
Rebecca Ewing committed
	if kafka_processor:
		kafka_processor.heartbeat()

	#
	# loop forever
	#

	while True:
		# collect the current coinc parameter PDF data from each job, and
		# run some iterations of the ranking statistic sampler to generate
		# noise and signal model ranking statistic histograms for each job.
		# NOTE:  the zero-lag ranking statistic histograms in the files
		# generated here are all 0.
		data = None

		for reg in registries:
			logging.info(f"Querying registry {reg}...")
			url = url_from_registry(reg, likelihood_path)

			svd_bin = reg[:4]
			# load the old ranking stat pdf for this bin:
			old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdfs[svd_bin][0], verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) if files_exist else None

			# create the new ranking stat pdf and marginalize as we go
			new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)

			add_to_data = 0
			if new_pdf_status and old_pdf:
				pdf += old_pdf
				add_to_data = 1
			elif new_pdf_status and not old_pdf:
				add_to_data = 1
			elif not new_pdf_status and old_pdf:
				pdf = old_pdf
				add_to_data = 1
				failed.append(reg)
			if add_to_data:
				# make sure the zerolag in the pdf is empty
				pdf.zero_lag_lr_lnpdf.count.array[:] = 0.

				if new_pdf_status:
					# save the new PDF + old PDF (if it exists) to disk before extinction
					xmldoc = lw.ligolw.Document()
					xmldoc.appendChild(lw.ligolw.LIGO_LW())
					process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
					far.gen_likelihood_control_doc(xmldoc, None, pdf)
					process.set_end_time_now()
					ligolw_utils.write_url(xmldoc, pdfs[svd_bin].files[0], verbose = options.verbose, trap_signals = None)


				# get the zerolag pdf for this bin and use it to perform bin-specific extinction
				zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
				pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
				if data:
					data += pdf.new_with_extinction()
				else:
					data = pdf.new_with_extinction()

Rebecca Ewing's avatar
Rebecca Ewing committed
			# while looping through registries
			# send heartbeat messages
			if kafka_processor:
				kafka_processor.heartbeat()


		# if we fail to complete more than 1% of the bins,
		# this is a serious problem and we should just quit
		# and restart from scratch
		if len(failed) >= 0.01 * len(registries):
			logging.info(f"Failed to complete {len(failed)} registries out of {len(registries)}, exiting.")
			sys.exit(1)

		# otherwise, lets cut our losses and continue
		logging.info(f"Done with calc rank pdfs. Failed registries: {failed}")

		# sum the noise and signal model ranking statistic histograms
		# across jobs, and collect and sum the current observed zero-lag
		# ranking statistic histograms from all jobs. combine with the
		# noise and signal model ranking statistic histograms.  NOTE:  the
		# noise and signal model ranking statistic histograms in the
		# zero-lag counts files downloaded from the jobs must be all 0, and
		# the zero-lag counts in the output generated by
		# gstlal_inspiral_calc_rank_pdfs must be 0.  NOTE:  this is where
		# the zero-lag counts have the density estimation transform
		# applied.

		zerolag_counts_url = url_from_registry("gstlal_ll_inspiral_trigger_counter_registry.txt", zerolag_counts_path)

		# add zerolag counts url to marginalized data
		data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")

Rebecca Ewing's avatar
Rebecca Ewing committed
		if kafka_processor:
			kafka_processor.heartbeat()

		# apply density estimation and normalize the PDF
		data.density_estimate_zero_lag_rates()

		# write output document
		xmldoc = ligolw.Document()
		xmldoc.appendChild(ligolw.LIGO_LW())
		process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})

		far.gen_likelihood_control_doc(xmldoc, None, data)
		process.set_end_time_now()
		ligolw_utils.write_filename(xmldoc, marg_pdf.files[0], verbose = options.verbose)
		logging.info(f"Done marginalizing likelihoods.")

		# we just created the bin-specific and marg DIST_STAT_PDFs,
		# so the files definitely exist for the next iteration of the loop
		files_exist = True

Rebecca Ewing's avatar
Rebecca Ewing committed
		if kafka_processor:
			kafka_processor.heartbeat()