diff --git a/gstlal-inspiral/bin/gstlal_inspiral_marginalize_likelihoods_online b/gstlal-inspiral/bin/gstlal_inspiral_marginalize_likelihoods_online index dc4599c7f3e61859809ac52093a961b0090f0931..1ddb16798e9854c9c08b4081f9b766688d77f1ec 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_marginalize_likelihoods_online +++ b/gstlal-inspiral/bin/gstlal_inspiral_marginalize_likelihoods_online @@ -60,6 +60,7 @@ import shutil import itertools from gstlal import inspiral from gstlal import events +from gstlal.datafind import DataCache, DataType from collections import deque from urllib.error import URLError, HTTPError @@ -68,17 +69,19 @@ from ligo import lw from ligo.lw import ligolw from ligo.lw import utils as ligolw_utils from ligo.lw.utils import process as ligolw_process +from lal.utils import CacheEntry from gstlal import far def parse_command_line(): parser = OptionParser() - parser.add_option("--output", metavar = "filename", help = "") + parser.add_option("--output", metavar = "path", help = "Set the path where the output marginalized PDF is stored") parser.add_option("--registry", metavar = "filename", action = "append", help = "") parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).") parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092") parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'") + parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist") parser.add_option("--verbose", action = "store_true", help = "Be verbose.") options, filenames = parser.parse_args() @@ -92,20 +95,28 @@ def parse_command_line(): def calc_rank_pdfs(url, samples, num_cores, verbose = False): """ - load Ranking Stat PDF from a url + create a Ranking Stat PDF from a url """ - try: - rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose) - except (URLError, HTTPError) as e: - logger = logging.getLogger("marginalize_likelihoods_online") - logger.warning(f'Caught error while running calc rank pdfs: {e}.') - return 0, None - - lr_rankingstat = rankingstat.copy() - lr_rankingstat.finish() - rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose) + tries = 0 + failed = 1 + while tries < 3: + try: + rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose) + failed = 0 + break + except (URLError, HTTPError) as e: + logger.warning(f'Caught error while running calc rank pdfs: {e}.') + tries += 1 + + if not failed: + lr_rankingstat = rankingstat.copy() + lr_rankingstat.finish() + rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose) + + return 1, rankingstatpdf - return 1, rankingstatpdf + else: + return 0, None def url_from_registry(registry, path): @@ -131,7 +142,6 @@ def main(): logger = logging.getLogger("marginalize_likelihoods_online") logger.setLevel(log_level) - output = options.output registries = options.registry failed = deque(maxlen = len(registries)) @@ -140,6 +150,14 @@ def main(): # get 10 million samples ranking_stat_samples = int(10000000 / len(registries)) + # + # set up the output paths + # + + svd_bins = [reg[:4] for reg in registries] + pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, CacheEntry.from_T050017(options.output).observatory, svd_bins = svd_bins) + pdfs = pdfs.groupby('svd_bin') + # # paths to data objects on each job's web management interface # @@ -188,45 +206,73 @@ def main(): logger.info(f"Querying registry {reg}...") url = url_from_registry(reg, likelihood_path) - # load ranking stat pdf and marginalize as we go - status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose) - if status: - if data: - data += pdf - else: - data = pdf + svd_bin = reg[:4] + if os.path.isfile(pdfs[svd_bin].files[0]): + # load the old ranking stat pdf for this bin: + _, old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdfs[svd_bin].files[0], verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) + else: + logger.warning(f"Couldn't find {pdfs[svd_bin].files[0]}, starting from scratch") + old_pdf = None + + # create the new ranking stat pdf and marginalize as we go + new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose) + + add_to_data = 0 + if new_pdf_status and old_pdf: + pdf += old_pdf + add_to_data = 1 + elif new_pdf_status and not old_pdf: + add_to_data = 1 + elif not new_pdf_status and old_pdf: + pdf = old_pdf + add_to_data = 1 + failed.append(reg) else: failed.append(reg) - # while looping through registries - # send heartbeat messages - if kafka_processor: - kafka_processor.heartbeat() - - # retry registries that we failed to process the first time - # give each registry a maximum of 3 retries, and remove from - # the deque upon success - retry = 1 - while retry <= 3 and failed: - for reg in list(failed): - url = url_from_registry(reg, likelihood_path) - - # load ranking stat pdf and marginalize as we go - status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose) - if status: - logger.info(f"completed {reg} on retry: {retry}") - failed.remove(reg) + if add_to_data: + # make sure the zerolag in the pdf is empty + pdf.zero_lag_lr_lnpdf.count.array[:] = 0. + + if new_pdf_status: + # save the new PDF + old PDF (if it exists) to disk before extinction + xmldoc = lw.ligolw.Document() + xmldoc.appendChild(lw.ligolw.LIGO_LW()) + process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {}) + far.gen_likelihood_control_doc(xmldoc, None, pdf) + process.set_end_time_now() + ligolw_utils.write_url(xmldoc, pdfs[svd_bin].files[0], verbose = options.verbose, trap_signals = None) + + + # get the zerolag pdf for this bin and use it to perform bin-specific extinction + zerolag_counts_url = url_from_registry(reg, zerolag_counts_path) + pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood") + if pdf.ready_for_extinction(): + # LR calculation has started and we are ready to perform first-round extinction + if data: + data += pdf.new_with_extinction() + else: + data = pdf.new_with_extinction() + else: + # add a zeroed-out PDF instead, so that the template ids get added to data + logger.warning(f'Skipping first-round extinction for {pdfs[svd_bin].files[0]}, using an empty PDF instead') + pdf.noise_lr_lnpdf.array[:] = 0. + pdf.signal_lr_lnpdf.array[:] = 0. + pdf.zero_lag_lr_lnpdf.array[:] = 0. if data: data += pdf else: data = pdf - else: - logger.warning(f"failed to complete {reg} on retry: {retry}") - - if kafka_processor: - kafka_processor.heartbeat() + - retry += 1 + # while looping through registries + # send heartbeat messages + if kafka_processor: + kafka_processor.heartbeat() + + # zero out the zerolag after the first round of extinction is finished + if data: + data.zero_lag_lr_lnpdf.count.array[:] = 0 # if we fail to complete more than 1% of the bins, # this is a serious problem and we should just quit @@ -252,40 +298,14 @@ def main(): zerolag_counts_url = url_from_registry("gstlal_ll_inspiral_trigger_counter_registry.txt", zerolag_counts_path) # add zerolag counts url to marginalized data - data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood") + if data: + data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood") + else: + data = far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood") if kafka_processor: kafka_processor.heartbeat() - # NOTE comment this to unmix in previous samples - if os.path.isfile(options.output): - prev_output, prevoutput_path = tempfile.mkstemp(".xml.gz", dir=os.getenv("_CONDOR_SCRATCH_DIR", tempfile.gettempdir())) - logger.info(f'Copying {options.output} to {prevoutput_path}') - shutil.copy(options.output, prevoutput_path) - - _, zlpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(prevoutput_path, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) - - # Zero it out - zlpdf.zero_lag_lr_lnpdf.count.array[:] = 0. - - # write out the file to disk - xmldoc = lw.ligolw.Document() - xmldoc.appendChild(lw.ligolw.LIGO_LW()) - process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {}) - far.gen_likelihood_control_doc(xmldoc, None, zlpdf) - process.set_end_time_now() - ligolw_utils.write_url(xmldoc, prevoutput_path, verbose = options.verbose, trap_signals = None) - - # add previous output to marginalized data - data += far.RankingStatPDF.from_xml(xmldoc, u"gstlal_inspiral_likelihood") - - if kafka_processor: - kafka_processor.heartbeat() - - else: - prevoutput_path="" - logger.info(f"Previous output: {prevoutput_path}") - # apply density estimation and normalize the PDF data.density_estimate_zero_lag_rates() diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter index affc760a0583e3bbdbf9203be733c7f8a1057d42..9961ccb694a62d276d51b9864c2b6fb6eacd8e5c 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter @@ -345,13 +345,14 @@ class EventPlotter(events.EventProcessor): else: # no sngl for this instrument fig = plotfar.plot_snr_chi_pdf(rankingstat, instrument, plot_type, chi_type, self.max_snr) - filename = '{}_{}_{}_snr{}.{}'.format(event['gid'], instrument, plot_type, chi_type, self.format) - if not self.no_upload: - lvalert_helper.upload_fig(fig, self.client, event['gid'], filename = filename, log_message = '%s SNR, %ssq PDF' % (instrument, chi_type), tagname = 'background') - if self.output_path is not None: - filename = os.path.join(self.output_path, filename) - logger.info('writing {} ...'.format(filename)) - fig.savefig(filename) + if fig is not None: + filename = '{}_{}_{}_snr{}.{}'.format(event['gid'], instrument, plot_type, chi_type, self.format) + if not self.no_upload: + lvalert_helper.upload_fig(fig, self.client, event['gid'], filename = filename, log_message = '%s SNR, %ssq PDF' % (instrument, chi_type), tagname = 'background') + if self.output_path is not None: + filename = os.path.join(self.output_path, filename) + logger.info('writing {} ...'.format(filename)) + fig.savefig(filename) fig = plotfar.plot_likelihood_ratio_ccdf(fapfar, (0., max(40., coinc_event.likelihood - coinc_event.likelihood % 5. + 5.)), ln_likelihood_ratio_markers = (coinc_event.likelihood,)) filename = '{}_likehoodratio_ccdf.{}'.format(event['gid'], self.format) diff --git a/gstlal-inspiral/python/far.py b/gstlal-inspiral/python/far.py index 7859bb7f59d590d31404b1b07227e8a02642f277..9c7a97c4cb72325454820604965f769d373b1580 100644 --- a/gstlal-inspiral/python/far.py +++ b/gstlal-inspiral/python/far.py @@ -59,6 +59,7 @@ import math import multiprocessing import numpy import random +import scipy from scipy import interpolate import sys import time @@ -519,6 +520,9 @@ def binned_log_likelihood_ratio_rates_from_samples(signal_lr_lnpdf, noise_lr_lnp class RankingStatPDF(object): ligo_lw_name_suffix = u"gstlal_inspiral_rankingstatpdf" + extinction_fitting_limits = (1/2., 1/100.) + # extinction curve fitting is done for the range of LRs + # corresponding to the 1/2 and 1/100 point of the zl CCDF @staticmethod def density_estimate(lnpdf, name, kernel = rate.gaussian_window(4.)): @@ -712,89 +716,73 @@ WHERE return self - def new_with_extinction(self): - self = self.copy() + def ready_for_extinction(self): + # ensure we have sufficient zerolag and noise samples before attempting the extinction curve fit + # none of the checks below should evaluate to True even with a small amount of zerolag and noise + # samples. They should only evaluate to True at the start of an online analysis + bg = self.noise_lr_lnpdf.copy().array + fg = self.zero_lag_lr_lnpdf.copy().array + bg[:10] = 0. + fg[:10] = 0. + if fg.sum() == 0 or bg.sum() == 0: + return False + + fg_ccdf = numpy.cumsum(fg[::-1])[::-1] + ix_min = (fg_ccdf < fg_ccdf[0] * self.extinction_fitting_limits[0]).argmax() + ix_max = (fg_ccdf < fg_ccdf[0] * self.extinction_fitting_limits[1]).argmax() + if ix_min == ix_max: + return False + + if fg[ix_min: ix_max + 1].sum() == 0 or bg[ix_min: ix_max + 1].sum() == 0: + return False + + if (fg_ccdf[ix_min: ix_max + 1] == 0).any(): + # log will evaluate to -inf, and the curve fitting will crash + return False + + return True - # the probability that an event survives clustering is the - # probability that no event with a higher value of the - # ranking statistic is within +/- dt of the event in - # question. .noise_lr_lnpdf.count contains an accurate - # model of the counts of pre-clustered events in each - # ranking statistic bin, but we know the events are not - # independent and so the total number of them cannot be - # used to form a Poisson rate for the purpose of computing - # the probability we desire. it has been found, - # empirically, that if the CCDF of pre-clustered background - # event counts is raised to some power and the clustering - # survival probability computed from that as though it were - # the CCDF of a Poisson process with some effective mean - # event rate, the result is a very good model for the - # post-clustering distribution of ranking statistics. we - # have two unknown parameters: the power to which to raise - # the pre-clustered ranking statistic's CCDF, and the mean - # event rate to assume. we solve for these parameters by - # fitting to the observed zero-lag ranking statistic - # histogram + + def new_with_extinction(self, verbose = False): + self = self.copy() x = self.noise_lr_lnpdf.bins[0].centres() assert (x == self.zero_lag_lr_lnpdf.bins[0].centres()).all() - # some candidates are rejected by the ranking statistic, - # causing there to be a spike in the zero-lag density at ln - # L = -inf. if enough candidates get rejected this spike - # becomes the mode of the PDF which messes up the mask - # constructed below for the fit. we zero the first 40 bins - # here to prevent that from happening (assume density - # estimation kernel = 4 bins wide, with 10 sigma impulse - # length) - zl_counts = self.zero_lag_lr_lnpdf.array.copy() - zl_counts[:40] = 0. - if not zl_counts.any(): - raise ValueError("zero-lag counts are all zero") - - # get the noise counts - noise_counts = self.noise_lr_lnpdf.array.copy() - - # get the zerolag counts. - # we model the tail of the distribution - top 0.1 - 1% - where - # clustering only effects the result at a < 1% level. - if zl_counts.sum() < 100 * 100: - tail_zl_counts = zl_counts.sum() * 0.99 - else: - tail_zl_counts = zl_counts.sum() - 100 - onepercent = zl_counts.cumsum().searchsorted(tail_zl_counts) - - # normalize the counts - noise_counts /= noise_counts.sum() - zl_counts /= zl_counts.sum() - - # compute survival probability - norm = zl_counts[onepercent] / noise_counts[onepercent] - zl_counts[onepercent:] = 0 - noise_counts[onepercent:] = 0 - survival_probability = zl_counts / noise_counts - survival_probability[onepercent:] = norm - survival_probability[numpy.isnan(survival_probability)] = 0.0 - - # apply to background counts and signal counts - self.noise_lr_lnpdf.array *= survival_probability - self.noise_lr_lnpdf.normalize() - self.signal_lr_lnpdf.array *= survival_probability - self.signal_lr_lnpdf.normalize() + #FIXME: Add comprehensive explanation about the extinction model - # - # never allow PDFs that have had the extinction model - # applied to be written to disk: on-disk files must only - # ever provide the original data. forbid PDFs that have - # been extincted from being re-extincted. - # + lrs = self.noise_lr_lnpdf.centres()[0] + bg = self.noise_lr_lnpdf.array + fg = self.zero_lag_lr_lnpdf.array + + # zero out the beginning bins of each because they are notoriously bad and should just be ignored + bg[:10] = 0. + fg[:10] = 0. + + + # fitting is done between ix_min and ix_max + fg_ccdf = numpy.cumsum(fg[::-1])[::-1] + ix_min = (fg_ccdf < fg_ccdf[0] * self.extinction_fitting_limits[0]).argmax() + ix_max = (fg_ccdf < fg_ccdf[0] * self.extinction_fitting_limits[1]).argmax() + + + bgtotal = bg[ix_min: ix_max + 1].sum() + bg_ccdf = numpy.cumsum(bg[::-1])[::-1] / bgtotal - def new_with_extinction(*args, **kwargs): - raise NotImplementedError("re-extincting an extincted RankingStatPDF object is forbidden") - self.new_with_extinction = new_with_extinction - def to_xml(*args, **kwargs): - raise NotImplementedError("writing extincted RankingStatPDF object to disk is forbidden") - self.to_xml = to_xml + # define a function for the extincted bg for scipy.optimize.curve_fit to call + def bg_ccdf_extinct_func(idx, c, A): + return numpy.log(A * bgtotal / c) + numpy.log1p(-numpy.exp(-1 * bg_ccdf[idx] * c)) + + # find the best fit c for extinction + c = scipy.optimize.curve_fit(bg_ccdf_extinct_func, range(ix_min, ix_max + 1), numpy.log(fg_ccdf[ix_min: ix_max + 1]), bounds = [0, numpy.inf], sigma = numpy.sqrt(1 / (bg_ccdf[ix_min: ix_max + 1] * bgtotal)))#, maxfev = 5000) + if verbose: + print(f"Best value of c is {c[0][0]}, A is {c[0][1]} with covariance {c[1]}", file = sys.stderr) + + # calculate the extincted PDF + bg_pdf_extinct = c[0][1] * bg * numpy.exp(-1 * bg_ccdf * c[0][0]) + + self.noise_lr_lnpdf.array = bg_pdf_extinct + self.noise_lr_lnpdf.normalize() return self diff --git a/gstlal-inspiral/python/plots/far.py b/gstlal-inspiral/python/plots/far.py index 9fa3941875ebd362176be0e2f7093854ae41e741..7a892244b42ee6d858580cc5cf8f94290e3431b5 100644 --- a/gstlal-inspiral/python/plots/far.py +++ b/gstlal-inspiral/python/plots/far.py @@ -103,8 +103,8 @@ def plot_snr_chi_pdf(rankingstat, instrument, which, chi_type, snr_max, event_sn y = binnedarray.bins[1].centres()[:-1] z = binnedarray.at_centres()[:-1,:-1] if numpy.isnan(z).any(): - if numpy.isnan(z).all(): - warnings.warn("%s %s is all NaN, skipping" % (instrument, which)) + if numpy.logical_or(numpy.isnan(z), numpy.isinf(z)).all(): + warnings.warn("%s %s is all NaN or inf, skipping" % (instrument, which)) return None warnings.warn("%s %s contains NaNs" % (instrument, which)) z = numpy.ma.masked_where(numpy.isnan(z), z)