Newer
Older
#!/usr/bin/env python3
# Copyright (C) 2012,2014 Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
### This program runs gstlal_inspiral_marginalize_likelihood in a while True
### loop; See gstlal_inspiral_marginalize_likelihoods_online for more details
###
###
### This program is not meant to be executed standalone by a user. It should be
### part of a DAG managing a running gstlal_inspiral online analysis.
###
### This program takes two or more arguments;
###
### 1. The path of the output file name
### 2. One or more paths to text files each containing a single line giving
### the root URL of a web server from which to retrieve event parameter
### distribution data (e.g., "http://node001.ligo.caltech.edu")
###
### This program queries each running gstlal_inspiral job via the URL,
### computes PDFs of the likelihood ratio ranking statistics from the
### parameter distribution data, then marginalizes the ranking statistic PDFs
### across jobs and writes the result to the given output file.
###
### It continues to do that in an infinite loop with a 10 minute pause on
### each iteration. Files are not overwritten directly, but rather via a
### temporary file and mv operation to ensure that no files are corrupted in
### a POSIX file environment.
###
### Review status
### -------------
###
### +---------------------------------------------+---------------------------------------------+------------+
### | Names | Hash | Date |
### +=============================================+=============================================+============+
### | Florent, Jolien, Kipp, Chad | 0e96523b8846e5a4597ba3477c8462443470cd94 | 2015-05-15 |
### +---------------------------------------------+---------------------------------------------+------------+
###
from optparse import OptionParser
import sys
import time
import logging
import tempfile
import os
import shutil
import itertools
from gstlal.datafind import DataCache, DataType
from collections import deque
from urllib.error import URLError, HTTPError
from ligo import lw
from ligo.lw import ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
from gstlal import far
def parse_command_line():
parser = OptionParser()
parser.add_option("--output-path", metavar = "path", help = "Set the path where the output PDFs are stored. Optional")
parser.add_option("--registry", metavar = "filename", action = "append", help = "")
parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).")
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist")
parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
if options.registry is None:
raise ValueError("must provide at least one registry file.")
return options
def calc_rank_pdfs(url, samples, num_cores, verbose = False):
"""
tries = 0
failed = 1
while tries < 3:
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
failed = 0
break
except (URLError, HTTPError) as e:
logging.warning(f'Caught error while running calc rank pdfs: {e}.')
tries += 1
if not failed:
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
return 1, rankingstatpdf
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def url_from_registry(registry, path):
"""
parse a registry file and return
the url to given path
"""
with open(registry, "r") as f:
server = f.read()
server = server.replace("\n", "")
logging.debug(f'server: {server}')
return os.path.join(server, path)
def main():
options = parse_command_line()
# set up logging
log_level = logging.INFO if options.verbose else logging.DEBUG
logging.basicConfig(format = '%(asctime)s | marginalize_likelihoods: %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
registries = options.registry
failed = deque(maxlen = len(registries))
# options for generating ranking stat pdfs
# get 10 million samples
ranking_stat_samples = int(10000000 / len(registries))
#
# set up the output paths
#
marg_pdf = DataCache.find(DataType.DIST_STAT_PDFS, root = options.output_path)
pdfs = DataCache.find(DataType.DIST_STAT_PDFS, svd_bins = "*", root = options.output_path)
if len(marg_pdf) == 1 and len(pdfs) == len(registries):
files_exist = True
elif len(marg_pdf) == 0 and len(pdfs) == 0:
files_exist = False
elif len(marg_pdf) == 1 and len(pdfs) != len(registries):
raise ValueError(f"Number of registry files provided ({len(registries)}) does not match number of DIST_STAT_PDF files found ({len(pdfs)})")
else:
raise ValueError("Could not find marg DIST_STAT_PDF file")
svd_bins = [reg[:4] for reg in registries]
if files_exist:
assert set(pdfs.groupby('svd_bin').keys()) == set(svd_bins), "svd bins of registry files are not the same as svd bins of found PDFs"
else:
marg_pdf = DataCache.generate(DataType.DIST_STAT_PDFS, options.ifo, root = options.output_path)
pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, options.ifo, svd_bins = svd_bins, root = options.output_path)
pdfs = pdfs.groupby('svd_bin')
#
# paths to data objects on each job's web management interface
#
likelihood_path="rankingstat.xml"
zerolag_counts_path="zerolag_rankingstatpdf.xml"
#
# send heartbeat messages for the purpose of monitoring
#
if options.output_kafka_server:
kafka_processor = events.EventProcessor(
kafka_server=options.output_kafka_server,
tag=options.tag,
send_heartbeats = True,
heartbeat_cadence = 60.,
heartbeat_topic = f"gstlal.{options.tag}.marginalize_likelihoods_online_heartbeat"
)
else:
kafka_processor = None
#
# pause on start up to make sure all inspiral jobs are running
#
sleep=600
logging.info(f"... sleeping for {sleep} seconds ...")
time.sleep(sleep)
#
# loop forever
#
while True:
# collect the current coinc parameter PDF data from each job, and
# run some iterations of the ranking statistic sampler to generate
# noise and signal model ranking statistic histograms for each job.
# NOTE: the zero-lag ranking statistic histograms in the files
# generated here are all 0.
data = None
for reg in registries:
logging.info(f"Querying registry {reg}...")
url = url_from_registry(reg, likelihood_path)
svd_bin = reg[:4]
# load the old ranking stat pdf for this bin:
old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdfs[svd_bin][0], verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) if files_exist else None
# create the new ranking stat pdf and marginalize as we go
new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
add_to_data = 0
if new_pdf_status and old_pdf:
pdf += old_pdf
add_to_data = 1
elif new_pdf_status and not old_pdf:
add_to_data = 1
elif not new_pdf_status and old_pdf:
pdf = old_pdf
add_to_data = 1
failed.append(reg)
else:
failed.append(reg)
if add_to_data:
# make sure the zerolag in the pdf is empty
pdf.zero_lag_lr_lnpdf.count.array[:] = 0.
if new_pdf_status:
# save the new PDF + old PDF (if it exists) to disk before extinction
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, pdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, pdfs[svd_bin].files[0], verbose = options.verbose, trap_signals = None)
# get the zerolag pdf for this bin and use it to perform bin-specific extinction
zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
if data:
data += pdf.new_with_extinction()
else:
data = pdf.new_with_extinction()
# while looping through registries
# send heartbeat messages
if kafka_processor:
kafka_processor.heartbeat()
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# if we fail to complete more than 1% of the bins,
# this is a serious problem and we should just quit
# and restart from scratch
if len(failed) >= 0.01 * len(registries):
logging.info(f"Failed to complete {len(failed)} registries out of {len(registries)}, exiting.")
sys.exit(1)
# otherwise, lets cut our losses and continue
logging.info(f"Done with calc rank pdfs. Failed registries: {failed}")
# sum the noise and signal model ranking statistic histograms
# across jobs, and collect and sum the current observed zero-lag
# ranking statistic histograms from all jobs. combine with the
# noise and signal model ranking statistic histograms. NOTE: the
# noise and signal model ranking statistic histograms in the
# zero-lag counts files downloaded from the jobs must be all 0, and
# the zero-lag counts in the output generated by
# gstlal_inspiral_calc_rank_pdfs must be 0. NOTE: this is where
# the zero-lag counts have the density estimation transform
# applied.
zerolag_counts_url = url_from_registry("gstlal_ll_inspiral_trigger_counter_registry.txt", zerolag_counts_path)
# add zerolag counts url to marginalized data
data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
# apply density estimation and normalize the PDF
data.density_estimate_zero_lag_rates()
# write output document
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, data)
process.set_end_time_now()
ligolw_utils.write_filename(xmldoc, marg_pdf.files[0], verbose = options.verbose)
logging.info(f"Done marginalizing likelihoods.")
# we just created the bin-specific and marg DIST_STAT_PDFs,
# so the files definitely exist for the next iteration of the loop
files_exist = True
sys.exit(1)
if __name__ == '__main__':
main()