Newer
Older
#!/usr/bin/env python3
# Copyright (C) 2012,2014 Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
### This program runs gstlal_inspiral_marginalize_likelihood in a while True
### loop; See gstlal_inspiral_marginalize_likelihoods_online for more details
###
###
### This program is not meant to be executed standalone by a user. It should be
### part of a DAG managing a running gstlal_inspiral online analysis.
###
### This program takes two or more arguments;
###
### 1. The path of the output file name
### 2. One or more paths to text files each containing a single line giving
### the root URL of a web server from which to retrieve event parameter
### distribution data (e.g., "http://node001.ligo.caltech.edu")
###
### This program queries each running gstlal_inspiral job via the URL,
### computes PDFs of the likelihood ratio ranking statistics from the
### parameter distribution data, then marginalizes the ranking statistic PDFs
### across jobs and writes the result to the given output file.
###
### It continues to do that in an infinite loop with a 10 minute pause on
### each iteration. Files are not overwritten directly, but rather via a
### temporary file and mv operation to ensure that no files are corrupted in
### a POSIX file environment.
###
### Review status
### -------------
###
### +---------------------------------------------+---------------------------------------------+------------+
### | Names | Hash | Date |
### +=============================================+=============================================+============+
### | Florent, Jolien, Kipp, Chad | 0e96523b8846e5a4597ba3477c8462443470cd94 | 2015-05-15 |
### +---------------------------------------------+---------------------------------------------+------------+
###
from optparse import OptionParser
import sys
import time
import logging
import tempfile
import os
import shutil
import itertools
from collections import deque
from urllib.error import URLError, HTTPError
from ligo import lw
from ligo.lw import ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
Prathamesh Joshi
committed
from gstlal import far
Prathamesh Joshi
committed
from gstlal import inspiral
from gstlal import events
from gstlal.datafind import DataCache, DataType, T050017_filename, DEFAULT_BACKUP_DIR
def parse_command_line():
parser = OptionParser()
parser.add_option("--output", metavar = "path", help = "Set the path where the output marginalized PDF is stored")
parser.add_option("--registry", metavar = "filename", action = "append", help = "")
parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).")
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist")
parser.add_option("--fast-burnin", action = "store_true", default = False, help = "Set whether to enable fast burn-in or not. If enabled, only 67% of bins are required to contribute to the marginalized PDF")
parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
if options.output is None:
raise ValueError("must set --output.")
if options.registry is None:
raise ValueError("must provide at least one registry file.")
return options
def calc_rank_pdfs(url, samples, num_cores, verbose = False):
"""
Prathamesh Joshi
committed
compute a Ranking Stat PDF from a url
"""
logger = logging.getLogger("marginalize_likelihoods_online")
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
except (URLError, HTTPError) as e:
logger.warning(f'Caught error while running calc rank pdfs: {e}.')
return 0, None
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
if lr_rankingstat.is_healthy():
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
else:
# create an empty rankingstat pdf, so that the template ids
# from this bin get added to the marg rankingstat pdf
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = 0, nthreads = num_cores, verbose = verbose)
# wait a bit so that we're not constatntly firing HTTP
# requests to jobs at the start of an analysis
time.sleep(2)
return 1, rankingstatpdf
def process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, num_cores, verbose = False):
"""
This method does the following operations:
1. loads the old PDF for this bin
Prathamesh Joshi
committed
2. calls calc_rank_pdfs to compute the new PDF for this bin
3. adds the two together
4. saves the sum to disk if the new PDF was successfully calculated
5. gets the zerolag for that bin and performs bin-specific (first round) extinction
Prathamesh Joshi
committed
6. returns the success status of the new PDF, extinction status of the old+new PDF, and the extincted PDF
"""
logger = logging.getLogger("marginalize_likelihoods_online")
logger.info(f"Querying registry {reg}...")
url = url_from_registry(reg, likelihood_path)
svd_bin = reg[:4]
pdf_path = pdfs[svd_bin].files[0]
if os.path.isfile(pdf_path):
# load the old ranking stat pdf for this bin:
_, old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdf_path, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler))
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
logger.warning(f"Couldn't find {pdf_path}, starting from scratch")
old_pdf = None
# create the new ranking stat pdf and marginalize as we go
new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, num_cores, verbose = verbose)
# add the old and new pdfs if they are available
if new_pdf_status and old_pdf:
# both are available
pdf += old_pdf
if not new_pdf_status and old_pdf:
# only the old pdf is available, overwrite pdf with it
pdf = old_pdf
# make sure the zerolag in the pdf is empty
if pdf:
pdf.zero_lag_lr_lnpdf.count.array[:] = 0.
# save the new PDF + old PDF (if it exists) to disk
if new_pdf_status:
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, pdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, pdf_path, verbose = verbose, trap_signals = None)
Prathamesh Joshi
committed
extinction_status = 0
if pdf:
# get the zerolag pdf for this bin and use it to perform bin-specific extinction
zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
Prathamesh Joshi
committed
try:
pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
except (URLError, HTTPError) as e:
logger.warning(f'Caught error while trying to get zerolag for bin {svd_bin}: {e}.')
if pdf.ready_for_extinction():
# LR calculation has started and we are ready to perform first-round extinction
Prathamesh Joshi
committed
extinction_status = 1
pdf = pdf.new_with_extinction()
else:
# add a zeroed-out PDF instead, so that the template ids get added to data
logger.warning(f'Skipping first-round extinction for {pdfs[svd_bin].files[0]}, using an empty PDF instead')
pdf.noise_lr_lnpdf.array[:] = 0.
pdf.signal_lr_lnpdf.array[:] = 0.
pdf.zero_lag_lr_lnpdf.array[:] = 0.
Prathamesh Joshi
committed
return new_pdf_status, extinction_status, pdf
def url_from_registry(registry, path):
"""
parse a registry file and return
the url to given path
"""
with open(registry, "r") as f:
server = f.read()
server = server.replace("\n", "")
logger = logging.getLogger("marginalize_likelihoods_online")
logger.info(f'server: {server}')
return os.path.join(server, path)
def main():
options = parse_command_line()
# set up logging
log_level = logging.INFO if options.verbose else logging.WARNING
logging.basicConfig(format = '%(asctime)s | marginalize_likelihoods_online: %(levelname)s : %(message)s')
logger = logging.getLogger("marginalize_likelihoods_online")
logger.setLevel(log_level)
registries = options.registry
# options for generating ranking stat pdfs
# get 10 million samples
ranking_stat_samples = int(10000000 / len(registries))
#
# set up the output paths
#
svd_bins = [reg[:4] for reg in registries]
pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, CacheEntry.from_T050017(options.output).observatory, svd_bins = svd_bins)
#
# paths to data objects on each job's web management interface
#
likelihood_path="rankingstat.xml"
zerolag_counts_path="zerolag_rankingstatpdf.xml"
#
# send heartbeat messages for the purpose of monitoring
#
if options.output_kafka_server:
kafka_processor = events.EventProcessor(
kafka_server=options.output_kafka_server,
tag=options.tag,
send_heartbeats = True,
heartbeat_cadence = 60.,
heartbeat_topic = f"gstlal.{options.tag}.marginalize_likelihoods_online_heartbeat"
)
else:
kafka_processor = None
#
# pause on start up to make sure all inspiral jobs are running
#
sleep=600
logger.info(f"... sleeping for {sleep} seconds ...")
time.sleep(sleep)
#
# loop forever
#
while True:
# collect the current coinc parameter PDF data from each job, and
# run some iterations of the ranking statistic sampler to generate
# noise and signal model ranking statistic histograms for each job.
# NOTE: the zero-lag ranking statistic histograms in the files
# generated here are all 0.
data = None
failed = deque(maxlen = len(registries))
Prathamesh Joshi
committed
num_extincted = 0 # number of bins for whom we were able to perform first-round extinction
for reg in registries:
# process every svd bin, retry twice if it failed
for tries in range(3):
Prathamesh Joshi
committed
status, extinction_status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
# add pdf to data
if data:
data += pdf
else:
data = pdf
Prathamesh Joshi
committed
num_extincted += extinction_status
if not status:
logger.info(f"failed to complete {reg} during regular running")
failed.append(reg)
# while looping through registries
# send heartbeat messages
if kafka_processor:
kafka_processor.heartbeat()
# retry registries that we failed to process the first time
# and remove from the deque upon success
for reg in list(failed):
Prathamesh Joshi
committed
status, extinction_status, pdf = process_svd_bin(reg, likelihood_path, zerolag_counts_path, pdfs, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
logger.info(f"completed {reg} on final retry")
failed.remove(reg)
# add pdf to data
if data:
data += pdf
data = pdf
else:
logger.info(f"failed to complete {reg} on final retry")
# add pdf to data anyway, as this will contain the extincted
# old pdf for this bin. Adding the old pdf for this bin is
# better than adding no pdf for this bin. Don't remove the
# registry from the failed deque
if pdf:
if data:
data += pdf
else:
data = pdf
Prathamesh Joshi
committed
num_extincted += extinction_status
# zero out the zerolag after the first round of extinction is finished
if data:
data.zero_lag_lr_lnpdf.count.array[:] = 0
# if we fail to complete more than 1% of the bins,
# this is a serious problem and we should just quit
# and restart from scratch
if len(failed) >= 0.01 * len(registries):
logger.critical(f"Failed to complete {len(failed)} registries out of {len(registries)}, exiting.")
sys.exit(1)
# otherwise, lets cut our losses and continue
logger.info(f"Done with calc rank pdfs. Failed registries: {failed}")
# sum the noise and signal model ranking statistic histograms
# across jobs, and collect and sum the current observed zero-lag
# ranking statistic histograms from all jobs. combine with the
# noise and signal model ranking statistic histograms. NOTE: the
# noise and signal model ranking statistic histograms in the
# zero-lag counts files downloaded from the jobs must be all 0, and
# the zero-lag counts in the output generated by
# gstlal_inspiral_calc_rank_pdfs must be 0. NOTE: this is where
# the zero-lag counts have the density estimation transform
# applied.
zerolag_counts_url = url_from_registry("gstlal_ll_inspiral_trigger_counter_registry.txt", zerolag_counts_path)
# add zerolag counts url to marginalized data
if data:
data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
else:
data = far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
# apply density estimation and normalize the PDF
data.density_estimate_zero_lag_rates()
Prathamesh Joshi
committed
# write output document only if 99% of bins have been extincted and
# hence have contributed to the noise diststat PDF. Otherwise, the
# PDF will not be representative of the LRs across all bins
# If fast burn-in is enabled, this value is changed to 67%. This is
# taken from how many bins are ready_for_extinction after a 2 week
# period in an HL EW analysis with 1000 templates / bin
# NOTE: This option is not meant to be used for analyses other than
# the one mentioned above.
if num_extincted >= 0.99 * len(registries) or (options.fast_burnin and num_extincted >= 0.666667 * len(registries)):
Prathamesh Joshi
committed
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, data)
process.set_end_time_now()
ligolw_utils.write_filename(xmldoc, options.output, verbose = options.verbose)
# save the same file to the backup dir as a precaution
now = int(inspiral.now())
f = CacheEntry.from_T050017(options.output)
Prathamesh Joshi
committed
backup_dir = os.path.join(DEFAULT_BACKUP_DIR, os.path.dirname(options.output))
Prathamesh Joshi
committed
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
backup_fname = T050017_filename(f.observatory, f.description, (now, now), "xml.gz")
backup_fname = os.path.join(backup_dir, backup_fname)
ligolw_utils.write_filename(xmldoc, backup_fname, verbose = options.verbose)
Prathamesh Joshi
committed
logger.info(f"Done marginalizing likelihoods.")
sys.exit(1)
if __name__ == '__main__':
main()