Skip to content
Snippets Groups Projects
Commit 61baebb4 authored by Prathamesh Joshi's avatar Prathamesh Joshi
Browse files

marginalize_likelihoods_online: Write noise PDF only if more than 99% of bins...

marginalize_likelihoods_online: Write noise PDF only if more than 99% of bins have undergone first round extinction
parent aeb7f963
No related branches found
No related tags found
No related merge requests found
Pipeline #586927 canceled
......@@ -119,7 +119,7 @@ def process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_sta
3. adds the two together
4. saves the sum to disk if the new PDF was successfully calculated
5. gets the zerolag for that bin and performs bin-specific (first round) extinction
6. returns the success status of the new PDF and the extincted PDF
6. returns the success status of the new PDF, extinction status of the old+new PDF, and the extincted PDF
"""
logger = logging.getLogger("marginalize_likelihoods_online")
......@@ -158,12 +158,14 @@ def process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_sta
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, pdf_path, verbose = verbose, trap_signals = None)
extinction_status = 0
if pdf:
# get the zerolag pdf for this bin and use it to perform bin-specific extinction
zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
if pdf.ready_for_extinction():
# LR calculation has started and we are ready to perform first-round extinction
extinction_status = 1
pdf = pdf.new_with_extinction()
else:
# add a zeroed-out PDF instead, so that the template ids get added to data
......@@ -172,7 +174,7 @@ def process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_sta
pdf.signal_lr_lnpdf.array[:] = 0.
pdf.zero_lag_lr_lnpdf.array[:] = 0.
return new_pdf_status, pdf
return new_pdf_status, extinction_status, pdf
def url_from_registry(registry, path):
......@@ -256,17 +258,19 @@ def main():
# generated here are all 0.
data = None
failed = deque(maxlen = len(registries))
num_extincted = 0 # number of bins for whom we were able to perform first-round extinction
for reg in registries:
# process every svd bin, retry twice if it failed
for tries in range(3):
status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
status, extinction_status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
# add pdf to data
if data:
data += pdf
else:
data = pdf
num_extincted += extinction_status
break
if not status:
......@@ -281,7 +285,7 @@ def main():
# retry registries that we failed to process the first time
# and remove from the deque upon success
for reg in list(failed):
status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
status, extinction_status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
logger.info(f"completed {reg} on final retry")
failed.remove(reg)
......@@ -302,6 +306,8 @@ def main():
else:
data = pdf
num_extincted += extinction_status
if kafka_processor:
kafka_processor.heartbeat()
......@@ -345,24 +351,27 @@ def main():
# apply density estimation and normalize the PDF
data.density_estimate_zero_lag_rates()
# write output document
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, data)
process.set_end_time_now()
ligolw_utils.write_filename(xmldoc, options.output, verbose = options.verbose)
# save the same file to the backup dir as a precaution
now = int(inspiral.now())
f = CacheEntry.from_T050017(options.output)
backup_dir = os.path.join("backup", os.path.dirname(options.output))
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
backup_fname = T050017_filename(f.observatory, f.description, (now, now), "xml.gz")
backup_fname = os.path.join(backup_dir, backup_fname)
ligolw_utils.write_filename(xmldoc, backup_fname, verbose = options.verbose)
# write output document only if 99% of bins have been extincted and
# hence have contributed to the noise diststat PDF. Otherwise, the
# PDF will not be representative of the LRs across all bins
if num_extincted >= 0.99 * len(registries):
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, data)
process.set_end_time_now()
ligolw_utils.write_filename(xmldoc, options.output, verbose = options.verbose)
# save the same file to the backup dir as a precaution
now = int(inspiral.now())
f = CacheEntry.from_T050017(options.output)
backup_dir = os.path.join("backup", os.path.dirname(options.output))
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
backup_fname = T050017_filename(f.observatory, f.description, (now, now), "xml.gz")
backup_fname = os.path.join(backup_dir, backup_fname)
ligolw_utils.write_filename(xmldoc, backup_fname, verbose = options.verbose)
logger.info(f"Done marginalizing likelihoods.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment