Skip to content
Snippets Groups Projects
Commit 454f1032 authored by Prathamesh Joshi's avatar Prathamesh Joshi
Browse files

Online new extinction model

parent 75027197
No related branches found
No related tags found
1 merge request!564Online new extinction
Pipeline #581605 passed with warnings
......@@ -60,6 +60,7 @@ import shutil
import itertools
from gstlal import inspiral
from gstlal import events
from gstlal.datafind import DataCache, DataType
from collections import deque
from urllib.error import URLError, HTTPError
......@@ -74,17 +75,15 @@ from gstlal import far
def parse_command_line():
parser = OptionParser()
parser.add_option("--output", metavar = "filename", help = "")
parser.add_option("--output-path", metavar = "path", help = "Set the path where the output PDFs are stored. Optional")
parser.add_option("--registry", metavar = "filename", action = "append", help = "")
parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).")
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist")
parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
if options.output is None:
raise ValueError("must set --output.")
if options.registry is None:
raise ValueError("must provide at least one registry file.")
......@@ -92,19 +91,28 @@ def parse_command_line():
def calc_rank_pdfs(url, samples, num_cores, verbose = False):
"""
load Ranking Stat PDF from a url
create a Ranking Stat PDF from a url
"""
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
except (URLError, HTTPError) as e:
logging.warning(f'Caught error while running calc rank pdfs: {e}.')
return 0, None
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
tries = 0
failed = 1
while tries < 3:
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
failed = 0
break
except (URLError, HTTPError) as e:
logging.warning(f'Caught error while running calc rank pdfs: {e}.')
tries += 1
if not failed:
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
return 1, rankingstatpdf
return 1, rankingstatpdf
else:
return 0, None
def url_from_registry(registry, path):
......@@ -128,7 +136,6 @@ def main():
logging.basicConfig(format = '%(asctime)s | marginalize_likelihoods: %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
output = options.output
registries = options.registry
failed = deque(maxlen = len(registries))
......@@ -137,6 +144,30 @@ def main():
# get 10 million samples
ranking_stat_samples = int(10000000 / len(registries))
#
# set up the output paths
#
marg_pdf = DataCache.find(DataType.DIST_STAT_PDFS, root = options.output_path)
pdfs = DataCache.find(DataType.DIST_STAT_PDFS, svd_bins = "*", root = options.output_path)
if len(marg_pdf) == 1 and len(pdfs) == len(registries):
files_exist = True
elif len(marg_pdf) == 0 and len(pdfs) == 0:
files_exist = False
elif len(marg_pdf) == 1 and len(pdfs) != len(registries):
raise ValueError(f"Number of registry files provided ({len(registries)}) does not match number of DIST_STAT_PDF files found ({len(pdfs)})")
else:
raise ValueError("Could not find marg DIST_STAT_PDF file")
svd_bins = [reg[:4] for reg in registries]
if files_exist:
assert set(pdfs.groupby('svd_bin').keys()) == set(svd_bins), "svd bins of registry files are not the same as svd bins of found PDFs"
else:
marg_pdf = DataCache.generate(DataType.DIST_STAT_PDFS, options.ifo, root = options.output_path)
pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, options.ifo, svd_bins = svd_bins, root = options.output_path)
pdfs = pdfs.groupby('svd_bin')
#
# paths to data objects on each job's web management interface
#
......@@ -185,45 +216,53 @@ def main():
logging.info(f"Querying registry {reg}...")
url = url_from_registry(reg, likelihood_path)
# load ranking stat pdf and marginalize as we go
status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
if data:
data += pdf
else:
data = pdf
svd_bin = reg[:4]
# load the old ranking stat pdf for this bin:
old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdfs[svd_bin][0], verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) if files_exist else None
# create the new ranking stat pdf and marginalize as we go
new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
add_to_data = 0
if new_pdf_status and old_pdf:
pdf += old_pdf
add_to_data = 1
elif new_pdf_status and not old_pdf:
add_to_data = 1
elif not new_pdf_status and old_pdf:
pdf = old_pdf
add_to_data = 1
failed.append(reg)
else:
failed.append(reg)
if add_to_data:
# make sure the zerolag in the pdf is empty
pdf.zero_lag_lr_lnpdf.count.array[:] = 0.
if new_pdf_status:
# save the new PDF + old PDF (if it exists) to disk before extinction
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, pdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, pdfs[svd_bin].files[0], verbose = options.verbose, trap_signals = None)
# get the zerolag pdf for this bin and use it to perform bin-specific extinction
zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
if data:
data += pdf.new_with_extinction()
else:
data = pdf.new_with_extinction()
# while looping through registries
# send heartbeat messages
if kafka_processor:
kafka_processor.heartbeat()
# retry registries that we failed to process the first time
# give each registry a maximum of 3 retries, and remove from
# the deque upon success
retry = 1
while retry <= 3 and failed:
for reg in list(failed):
url = url_from_registry(reg, likelihood_path)
# load ranking stat pdf and marginalize as we go
status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
logging.info(f"completed {reg} on retry: {retry}")
failed.remove(reg)
if data:
data += pdf
else:
data = pdf
else:
logging.info(f"failed to complete {reg} on retry: {retry}")
if kafka_processor:
kafka_processor.heartbeat()
retry += 1
# if we fail to complete more than 1% of the bins,
# this is a serious problem and we should just quit
......@@ -254,35 +293,6 @@ def main():
if kafka_processor:
kafka_processor.heartbeat()
# NOTE comment this to unmix in previous samples
if os.path.isfile(options.output):
prev_output, prevoutput_path = tempfile.mkstemp(".xml.gz", dir=os.getenv("_CONDOR_SCRATCH_DIR", tempfile.gettempdir()))
logging.info(f'Copying {options.output} to {prevoutput_path}')
shutil.copy(options.output, prevoutput_path)
_, zlpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(prevoutput_path, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler))
# Zero it out
zlpdf.zero_lag_lr_lnpdf.count.array[:] = 0.
# write out the file to disk
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, zlpdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, prevoutput_path, verbose = options.verbose, trap_signals = None)
# add previous output to marginalized data
data += far.RankingStatPDF.from_xml(xmldoc, u"gstlal_inspiral_likelihood")
if kafka_processor:
kafka_processor.heartbeat()
else:
prevoutput_path=""
logging.info(f"Previous output: {prevoutput_path}")
# apply density estimation and normalize the PDF
data.density_estimate_zero_lag_rates()
......@@ -293,9 +303,13 @@ def main():
far.gen_likelihood_control_doc(xmldoc, None, data)
process.set_end_time_now()
ligolw_utils.write_filename(xmldoc, options.output, verbose = options.verbose)
ligolw_utils.write_filename(xmldoc, marg_pdf.files[0], verbose = options.verbose)
logging.info(f"Done marginalizing likelihoods.")
# we just created the bin-specific and marg DIST_STAT_PDFs,
# so the files definitely exist for the next iteration of the loop
files_exist = True
if kafka_processor:
kafka_processor.heartbeat()
......
......@@ -59,6 +59,7 @@ import math
import multiprocessing
import numpy
import random
import scipy
from scipy import interpolate
import sys
import time
......@@ -712,75 +713,47 @@ WHERE
return self
def new_with_extinction(self):
def new_with_extinction(self, verbose = False):
self = self.copy()
# the probability that an event survives clustering is the
# probability that no event with a higher value of the
# ranking statistic is within +/- dt of the event in
# question. .noise_lr_lnpdf.count contains an accurate
# model of the counts of pre-clustered events in each
# ranking statistic bin, but we know the events are not
# independent and so the total number of them cannot be
# used to form a Poisson rate for the purpose of computing
# the probability we desire. it has been found,
# empirically, that if the CCDF of pre-clustered background
# event counts is raised to some power and the clustering
# survival probability computed from that as though it were
# the CCDF of a Poisson process with some effective mean
# event rate, the result is a very good model for the
# post-clustering distribution of ranking statistics. we
# have two unknown parameters: the power to which to raise
# the pre-clustered ranking statistic's CCDF, and the mean
# event rate to assume. we solve for these parameters by
# fitting to the observed zero-lag ranking statistic
# histogram
x = self.noise_lr_lnpdf.bins[0].centres()
assert (x == self.zero_lag_lr_lnpdf.bins[0].centres()).all()
# some candidates are rejected by the ranking statistic,
# causing there to be a spike in the zero-lag density at ln
# L = -inf. if enough candidates get rejected this spike
# becomes the mode of the PDF which messes up the mask
# constructed below for the fit. we zero the first 40 bins
# here to prevent that from happening (assume density
# estimation kernel = 4 bins wide, with 10 sigma impulse
# length)
zl_counts = self.zero_lag_lr_lnpdf.array.copy()
zl_counts[:40] = 0.
if not zl_counts.any():
raise ValueError("zero-lag counts are all zero")
# get the noise counts
noise_counts = self.noise_lr_lnpdf.array.copy()
# get the zerolag counts.
# we model the tail of the distribution - top 0.1 - 1% - where
# clustering only effects the result at a < 1% level.
if zl_counts.sum() < 100 * 100:
tail_zl_counts = zl_counts.sum() * 0.99
else:
tail_zl_counts = zl_counts.sum() - 100
onepercent = zl_counts.cumsum().searchsorted(tail_zl_counts)
# normalize the counts
noise_counts /= noise_counts.sum()
zl_counts /= zl_counts.sum()
# compute survival probability
norm = zl_counts[onepercent] / noise_counts[onepercent]
zl_counts[onepercent:] = 0
noise_counts[onepercent:] = 0
survival_probability = zl_counts / noise_counts
survival_probability[onepercent:] = norm
survival_probability[numpy.isnan(survival_probability)] = 0.0
# apply to background counts and signal counts
self.noise_lr_lnpdf.array *= survival_probability
#FIXME: Add comprehensive explanation about the extinction model
lrs = self.noise_lr_lnpdf.centres()[0]
bg = self.noise_lr_lnpdf.array
fg = self.zero_lag_lr_lnpdf.array
# zero out the beginning bins of each because they are notoriously bad and should just be ignored
bg[:10] = 0.
fg[:10] = 0.
# fitting is done between ix_min and ix_max
fg_ccdf = numpy.cumsum(fg[::-1])[::-1]
ix_min = (fg_ccdf < fg_ccdf[0] / 2.).argmax()
ix_max = (fg_ccdf < fg_ccdf[0] / 100.).argmax()
bgtotal = bg[ix_min: ix_max + 1].sum()
bg_ccdf = numpy.cumsum(bg[::-1])[::-1] / bgtotal
# define a function for the extincted bg for scipy.optimize.curve_fit to call
def bg_ccdf_extinct_func(idx, c, A):
return numpy.log(A * bgtotal / c) + numpy.log1p(-numpy.exp(-1 * bg_ccdf[idx] * c))
# find the best fit c for extinction
c = scipy.optimize.curve_fit(bg_ccdf_extinct_func, range(ix_min, ix_max + 1), numpy.log(fg_ccdf[ix_min: ix_max + 1]), bounds = [0, numpy.inf], sigma = numpy.sqrt(1 / (bg_ccdf[ix_min: ix_max + 1] * bgtotal)))#, maxfev = 5000)
if verbose:
print(f"Best value of c is {c[0][0]}, A is {c[0][1]} with covariance {c[1]}", file = sys.stderr)
# calculate the extincted PDF
bg_pdf_extinct = c[0][1] * bg * numpy.exp(-1 * bg_ccdf * c[0][0])
self.noise_lr_lnpdf.array = bg_pdf_extinct
self.noise_lr_lnpdf.normalize()
self.signal_lr_lnpdf.array *= survival_probability
self.signal_lr_lnpdf.normalize()
#
# never allow PDFs that have had the extinction model
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment