Skip to content
Snippets Groups Projects
gstlal_inspiral_marginalize_likelihoods_online 15.5 KiB
Newer Older
# Copyright (C) 2012,2014  Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
### This program runs gstlal_inspiral_marginalize_likelihood in a while True
### loop; See gstlal_inspiral_marginalize_likelihoods_online for more details
###
###
### This program is not meant to be executed standalone by a user. It should be
### part of a DAG managing a running gstlal_inspiral online analysis.
###
### This program takes two or more arguments;
###
### 1. The path of the output file name
### 2. One or more paths to text files each containing a single line giving
###    the root URL of a web server from which to retrieve event parameter
###    distribution data (e.g., "http://node001.ligo.caltech.edu")
###
### This program queries each running gstlal_inspiral job via the URL,
### computes PDFs of the likelihood ratio ranking statistics from the
### parameter distribution data, then marginalizes the ranking statistic PDFs
### across jobs and writes the result to the given output file.
###
### It continues to do that in an infinite loop with a 10 minute pause on
### each iteration.  Files are not overwritten directly, but rather via a
### temporary file and mv operation to ensure that no files are corrupted in
### a POSIX file environment.
###
### Review status
### -------------
###
### +---------------------------------------------+---------------------------------------------+------------+
### | Names                                       | Hash                                        | Date       |
### +=============================================+=============================================+============+
### | Florent, Jolien, Kipp, Chad                 | 0e96523b8846e5a4597ba3477c8462443470cd94    | 2015-05-15 |
### +---------------------------------------------+---------------------------------------------+------------+
###
from optparse import OptionParser
import sys
import time
import logging
import tempfile
import os
import shutil
import itertools
from collections import deque
from urllib.error import URLError, HTTPError
from ligo import lw
from ligo.lw import ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
Prathamesh Joshi's avatar
Prathamesh Joshi committed
from lal.utils import CacheEntry
from gstlal import inspiral
from gstlal import events
from gstlal.datafind import DataCache, DataType, T050017_filename, DEFAULT_BACKUP_DIR
def parse_command_line():
	parser = OptionParser()
Prathamesh Joshi's avatar
Prathamesh Joshi committed
	parser.add_option("--output", metavar = "path", help = "Set the path where the output marginalized PDF is stored")
	parser.add_option("--registry", metavar = "filename", action = "append", help = "")
	parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).")
Rebecca Ewing's avatar
Rebecca Ewing committed
	parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
	parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
	parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist")
	parser.add_option("--fast-burnin", action = "store_true", default = False, help = "Set whether to enable fast burn-in or not. If enabled, only 67% of bins are required to contribute to the marginalized PDF")
	parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
	options, filenames = parser.parse_args()
	if options.output is None:
		raise ValueError("must set --output.")
	if options.registry is None:
		raise ValueError("must provide at least one registry file.")
	return options

def calc_rank_pdfs(url, samples, num_cores, verbose = False):
	"""
	"""
	logger = logging.getLogger("marginalize_likelihoods_online")
	try:
		rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
	except (URLError, HTTPError) as e:
		logger.warning(f'Caught error while running calc rank pdfs: {e}.')
		return 0, None

	lr_rankingstat = rankingstat.copy()
	lr_rankingstat.finish()
	if lr_rankingstat.is_healthy():
		rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
	else:
		# create an empty rankingstat pdf, so that the template ids
		# from this bin get added to the marg rankingstat pdf
		rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = 0, nthreads = num_cores, verbose = verbose)
		# wait a bit so that we're not constatntly firing HTTP
		# requests to jobs at the start of an analysis
		time.sleep(2)

	return 1, rankingstatpdf


def process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, num_cores, verbose = False):
	"""
	This method does the following operations:
	1. loads the old PDF for this bin
	2. calls calc_rank_pdfs to compute the new PDF for this bin
	3. adds the two together
	4. saves the sum to disk if the new PDF was successfully calculated
	5. gets the zerolag for that bin and performs bin-specific (first round) extinction
	6. returns the success status of the new PDF, extinction status of the old+new PDF, and the extincted PDF
	logger = logging.getLogger("marginalize_likelihoods_online")
	logger.info(f"Querying registry {reg}...")
	url = url_from_registry(reg, likelihood_path)
	svd_bin = reg[:4]
	pdf_path = pdfs[svd_bin].files[0]
	if os.path.isfile(pdf_path):
		# load the old ranking stat pdf for this bin:
		_, old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdf_path, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler))
		logger.warning(f"Couldn't find {pdf_path}, starting from scratch")
		old_pdf = None

	# create the new ranking stat pdf and marginalize as we go
	new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, num_cores, verbose = verbose)

	# add the old and new pdfs if they are available
	if new_pdf_status and old_pdf:
		# both are available
		pdf += old_pdf
	if not new_pdf_status and old_pdf:
		# only the old pdf is available, overwrite pdf with it
		pdf = old_pdf

	# make sure the zerolag in the pdf is empty
	if pdf:
		pdf.zero_lag_lr_lnpdf.count.array[:] = 0.

	# save the new PDF + old PDF (if it exists) to disk
	if new_pdf_status:
		xmldoc = lw.ligolw.Document()
		xmldoc.appendChild(lw.ligolw.LIGO_LW())
		process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
		far.gen_likelihood_control_doc(xmldoc, None, pdf)
		process.set_end_time_now()
		ligolw_utils.write_url(xmldoc, pdf_path, verbose = verbose, trap_signals = None)

	if pdf:
		# get the zerolag pdf for this bin and use it to perform bin-specific extinction
		zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
		try:
			pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
		except (URLError, HTTPError) as e:
			logger.warning(f'Caught error while trying to get zerolag for bin {svd_bin}: {e}.')
		if pdf.ready_for_extinction():
			# LR calculation has started and we are ready to perform first-round extinction
			pdf = pdf.new_with_extinction()
		else:
			# add a zeroed-out PDF instead, so that the template ids get added to data
			logger.warning(f'Skipping first-round extinction for {pdfs[svd_bin].files[0]}, using an empty PDF instead')
			pdf.noise_lr_lnpdf.array[:] = 0.
			pdf.signal_lr_lnpdf.array[:] = 0.
			pdf.zero_lag_lr_lnpdf.array[:] = 0.

	return new_pdf_status, extinction_status, pdf


def url_from_registry(registry, path):
	"""
	parse a registry file and return
	the url to given path 
	"""
	with open(registry, "r") as f:
		server = f.read()
	server = server.replace("\n", "")
	logger = logging.getLogger("marginalize_likelihoods_online")
	logger.info(f'server: {server}')

	return os.path.join(server, path)


def main():
	options = parse_command_line()

	# set up logging
	log_level = logging.INFO if options.verbose else logging.WARNING
	logging.basicConfig(format = '%(asctime)s | marginalize_likelihoods_online: %(levelname)s : %(message)s')
	logger = logging.getLogger("marginalize_likelihoods_online")
	logger.setLevel(log_level)

	registries = options.registry

	# options for generating ranking stat pdfs
	# get 10 million samples
	ranking_stat_samples = int(10000000 / len(registries))

	#
	# set up the output paths
	#

	svd_bins = [reg[:4] for reg in registries]
	pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, CacheEntry.from_T050017(options.output).observatory, svd_bins = svd_bins)
	pdfs = pdfs.groupby('svd_bin')

	#
	# paths to data objects on each job's web management interface
	#

	likelihood_path="rankingstat.xml"
	zerolag_counts_path="zerolag_rankingstatpdf.xml"

Rebecca Ewing's avatar
Rebecca Ewing committed
	#
	# send heartbeat messages for the purpose of monitoring
	#
	if options.output_kafka_server:
		kafka_processor = events.EventProcessor(
			kafka_server=options.output_kafka_server,
			tag=options.tag,
			send_heartbeats = True,
			heartbeat_cadence = 60.,
			heartbeat_topic = f"gstlal.{options.tag}.marginalize_likelihoods_online_heartbeat"
		)
	else:
		kafka_processor = None

	#
	# pause on start up to make sure all inspiral jobs are running
	#

	sleep=600
	logger.info(f"... sleeping for {sleep} seconds ...")
Rebecca Ewing's avatar
Rebecca Ewing committed
	if kafka_processor:
		kafka_processor.heartbeat()

	#
	# loop forever
	#

	while True:
		# collect the current coinc parameter PDF data from each job, and
		# run some iterations of the ranking statistic sampler to generate
		# noise and signal model ranking statistic histograms for each job.
		# NOTE:  the zero-lag ranking statistic histograms in the files
		# generated here are all 0.
		data = None
		failed = deque(maxlen = len(registries))
		num_extincted = 0 # number of bins for whom we were able to perform first-round extinction
			# process every svd bin, retry twice if it failed
			for tries in range(3):
				status, extinction_status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
				if status:
					# add pdf to data
					if data:
						data += pdf
					else:
						data = pdf
			if not status:
				logger.info(f"failed to complete {reg} during regular running")
			# while looping through registries
			# send heartbeat messages
			if kafka_processor:
				kafka_processor.heartbeat()

		# retry registries that we failed to process the first time
		# and remove from the deque upon success
		for reg in list(failed):
			status, extinction_status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
			if status:
				logger.info(f"completed {reg} on final retry")
				failed.remove(reg)
				# add pdf to data
				if data:
					data += pdf
					data = pdf
			else:
				logger.info(f"failed to complete {reg} on final retry")
				# add pdf to data anyway, as this will contain the extincted
				# old pdf for this bin. Adding the old pdf for this bin is 
				# better than adding no pdf for this bin. Don't remove the
				# registry from the failed deque
				if pdf:
Rebecca Ewing's avatar
Rebecca Ewing committed

Rebecca Ewing's avatar
Rebecca Ewing committed
			if kafka_processor:
				kafka_processor.heartbeat()
		
		# zero out the zerolag after the first round of extinction is finished
		if data:
			data.zero_lag_lr_lnpdf.count.array[:] = 0

		# if we fail to complete more than 1% of the bins,
		# this is a serious problem and we should just quit
		# and restart from scratch
		if len(failed) >= 0.01 * len(registries):
			logger.critical(f"Failed to complete {len(failed)} registries out of {len(registries)}, exiting.")
			sys.exit(1)

		# otherwise, lets cut our losses and continue
		logger.info(f"Done with calc rank pdfs. Failed registries: {failed}")

		# sum the noise and signal model ranking statistic histograms
		# across jobs, and collect and sum the current observed zero-lag
		# ranking statistic histograms from all jobs. combine with the
		# noise and signal model ranking statistic histograms.  NOTE:  the
		# noise and signal model ranking statistic histograms in the
		# zero-lag counts files downloaded from the jobs must be all 0, and
		# the zero-lag counts in the output generated by
		# gstlal_inspiral_calc_rank_pdfs must be 0.  NOTE:  this is where
		# the zero-lag counts have the density estimation transform
		# applied.

		zerolag_counts_url = url_from_registry("gstlal_ll_inspiral_trigger_counter_registry.txt", zerolag_counts_path)

		# add zerolag counts url to marginalized data
		if data:
			data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
		else:
			data = far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
Rebecca Ewing's avatar
Rebecca Ewing committed
		if kafka_processor:
			kafka_processor.heartbeat()

		# apply density estimation and normalize the PDF
		data.density_estimate_zero_lag_rates()

		# write output document only if 99% of bins have been extincted and
		# hence have contributed to the noise diststat PDF. Otherwise, the 
		# PDF will not be representative of the LRs across all bins
		# If fast burn-in is enabled, this value is changed to 67%. This is
		# taken from how many bins are ready_for_extinction after a 2 week
		# period in an HL EW analysis with 1000 templates / bin
		# NOTE: This option is not meant to be used for analyses other than
		# the one mentioned above.
		if num_extincted >= 0.99 * len(registries) or (options.fast_burnin and num_extincted >= 0.666667 * len(registries)):
			xmldoc = ligolw.Document()
			xmldoc.appendChild(ligolw.LIGO_LW())
			process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})

			far.gen_likelihood_control_doc(xmldoc, None, data)
			process.set_end_time_now()
			ligolw_utils.write_filename(xmldoc, options.output, verbose = options.verbose)

			# save the same file to the backup dir as a precaution
			now = int(inspiral.now())
			f = CacheEntry.from_T050017(options.output)
			backup_dir = os.path.join(DEFAULT_BACKUP_DIR, os.path.dirname(options.output))
			if not os.path.exists(backup_dir):
				os.makedirs(backup_dir)
			backup_fname = T050017_filename(f.observatory, f.description, (now, now), "xml.gz")
			backup_fname = os.path.join(backup_dir, backup_fname)
			ligolw_utils.write_filename(xmldoc, backup_fname, verbose = options.verbose)
		logger.info(f"Done marginalizing likelihoods.")
Rebecca Ewing's avatar
Rebecca Ewing committed
		if kafka_processor:
			kafka_processor.heartbeat()