Skip to content
Snippets Groups Projects

Online new extinction

Merged Prathamesh Joshi requested to merge o4b-online-new-extinction into o4b-online
Files
2
@@ -60,6 +60,7 @@ import shutil
import itertools
from gstlal import inspiral
from gstlal import events
from gstlal.datafind import DataCache, DataType
from collections import deque
from urllib.error import URLError, HTTPError
@@ -68,17 +69,19 @@ from ligo import lw
from ligo.lw import ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
from lal.utils import CacheEntry
from gstlal import far
def parse_command_line():
parser = OptionParser()
parser.add_option("--output", metavar = "filename", help = "")
parser.add_option("--output", metavar = "path", help = "Set the path where the output marginalized PDF is stored")
parser.add_option("--registry", metavar = "filename", action = "append", help = "")
parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).")
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist")
parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
@@ -92,19 +95,28 @@ def parse_command_line():
def calc_rank_pdfs(url, samples, num_cores, verbose = False):
"""
load Ranking Stat PDF from a url
create a Ranking Stat PDF from a url
"""
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
except (URLError, HTTPError) as e:
logging.warning(f'Caught error while running calc rank pdfs: {e}.')
return 0, None
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
tries = 0
failed = 1
while tries < 3:
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
failed = 0
break
except (URLError, HTTPError) as e:
logging.warning(f'Caught error while running calc rank pdfs: {e}.')
tries += 1
if not failed:
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
return 1, rankingstatpdf
return 1, rankingstatpdf
else:
return 0, None
def url_from_registry(registry, path):
@@ -128,7 +140,6 @@ def main():
logging.basicConfig(format = '%(asctime)s | marginalize_likelihoods: %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
output = options.output
registries = options.registry
failed = deque(maxlen = len(registries))
@@ -137,6 +148,29 @@ def main():
# get 10 million samples
ranking_stat_samples = int(10000000 / len(registries))
#
# set up the output paths
#
marg_pdf_exists = os.path.isfile(options.output)
pdfs = DataCache.find(DataType.DIST_STAT_PDFS, svd_bins = "*")
if marg_pdf_exists and len(pdfs) == len(registries):
files_exist = True
elif not marg_pdf_exists and len(pdfs) == 0:
files_exist = False
elif marg_pdf_exists and len(pdfs) != len(registries):
raise ValueError(f"Number of registry files provided ({len(registries)}) does not match number of DIST_STAT_PDF files found ({len(pdfs)})")
else:
raise ValueError("Could not find marg DIST_STAT_PDF file")
svd_bins = [reg[:4] for reg in registries]
if files_exist:
assert set(pdfs.groupby('svd_bin').keys()) == set(svd_bins), "svd bins of registry files are not the same as svd bins of found PDFs"
else:
pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, CacheEntry.from_T050017(options.output).observatory, svd_bins = svd_bins)
pdfs = pdfs.groupby('svd_bin')
#
# paths to data objects on each job's web management interface
#
@@ -185,45 +219,63 @@ def main():
logging.info(f"Querying registry {reg}...")
url = url_from_registry(reg, likelihood_path)
# load ranking stat pdf and marginalize as we go
status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
if data:
data += pdf
else:
data = pdf
svd_bin = reg[:4]
# load the old ranking stat pdf for this bin:
old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdfs[svd_bin][0], verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) if files_exist else None
# create the new ranking stat pdf and marginalize as we go
new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
add_to_data = 0
if new_pdf_status and old_pdf:
pdf += old_pdf
add_to_data = 1
elif new_pdf_status and not old_pdf:
add_to_data = 1
elif not new_pdf_status and old_pdf:
pdf = old_pdf
add_to_data = 1
failed.append(reg)
else:
failed.append(reg)
# while looping through registries
# send heartbeat messages
if kafka_processor:
kafka_processor.heartbeat()
# retry registries that we failed to process the first time
# give each registry a maximum of 3 retries, and remove from
# the deque upon success
retry = 1
while retry <= 3 and failed:
for reg in list(failed):
url = url_from_registry(reg, likelihood_path)
# load ranking stat pdf and marginalize as we go
status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
logging.info(f"completed {reg} on retry: {retry}")
failed.remove(reg)
if add_to_data:
# make sure the zerolag in the pdf is empty
pdf.zero_lag_lr_lnpdf.count.array[:] = 0.
if new_pdf_status:
# save the new PDF + old PDF (if it exists) to disk before extinction
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, pdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, pdfs[svd_bin].files[0], verbose = options.verbose, trap_signals = None)
# get the zerolag pdf for this bin and use it to perform bin-specific extinction
zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
if len(numpy.nonzero(pdf.zero_lag_lr_lnpdf.array)[0]) > 0:
# LR calculation has started and we are ready to perform first-round extinction
if data:
data += pdf.new_with_extinction()
else:
data = pdf.new_with_extinction()
else:
logging.warning(f'Skipping first-round extinction for {}')
if data:
data += pdf
else:
data = pdf
else:
logging.info(f"failed to complete {reg} on retry: {retry}")
if kafka_processor:
kafka_processor.heartbeat()
retry += 1
# while looping through registries
# send heartbeat messages
if kafka_processor:
kafka_processor.heartbeat()
# zero out the zerolag after the first round of extinction is finished
data.zero_lag_lr_lnpdf.count.array[:] = 0
# if we fail to complete more than 1% of the bins,
# this is a serious problem and we should just quit
@@ -254,35 +306,6 @@ def main():
if kafka_processor:
kafka_processor.heartbeat()
# NOTE comment this to unmix in previous samples
if os.path.isfile(options.output):
prev_output, prevoutput_path = tempfile.mkstemp(".xml.gz", dir=os.getenv("_CONDOR_SCRATCH_DIR", tempfile.gettempdir()))
logging.info(f'Copying {options.output} to {prevoutput_path}')
shutil.copy(options.output, prevoutput_path)
_, zlpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(prevoutput_path, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler))
# Zero it out
zlpdf.zero_lag_lr_lnpdf.count.array[:] = 0.
# write out the file to disk
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, zlpdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, prevoutput_path, verbose = options.verbose, trap_signals = None)
# add previous output to marginalized data
data += far.RankingStatPDF.from_xml(xmldoc, u"gstlal_inspiral_likelihood")
if kafka_processor:
kafka_processor.heartbeat()
else:
prevoutput_path=""
logging.info(f"Previous output: {prevoutput_path}")
# apply density estimation and normalize the PDF
data.density_estimate_zero_lag_rates()
@@ -296,6 +319,10 @@ def main():
ligolw_utils.write_filename(xmldoc, options.output, verbose = options.verbose)
logging.info(f"Done marginalizing likelihoods.")
# we just created the bin-specific and marg DIST_STAT_PDFs,
# so the files definitely exist for the next iteration of the loop
files_exist = True
if kafka_processor:
kafka_processor.heartbeat()
Loading