Skip to content
Snippets Groups Projects

Online new extinction

Merged Prathamesh Joshi requested to merge o4b-online-new-extinction into o4b-online
@@ -60,6 +60,7 @@ import shutil
import itertools
from gstlal import inspiral
from gstlal import events
from gstlal.datafind import DataCache, DataType
from collections import deque
from urllib.error import URLError, HTTPError
@@ -68,17 +69,19 @@ from ligo import lw
from ligo.lw import ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
from lal.utils import CacheEntry
from gstlal import far
def parse_command_line():
parser = OptionParser()
parser.add_option("--output", metavar = "filename", help = "")
parser.add_option("--output", metavar = "path", help = "Set the path where the output marginalized PDF is stored")
parser.add_option("--registry", metavar = "filename", action = "append", help = "")
parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).")
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist")
parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
@@ -92,19 +95,28 @@ def parse_command_line():
def calc_rank_pdfs(url, samples, num_cores, verbose = False):
"""
load Ranking Stat PDF from a url
create a Ranking Stat PDF from a url
"""
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
except (URLError, HTTPError) as e:
logging.warning(f'Caught error while running calc rank pdfs: {e}.')
return 0, None
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
tries = 0
failed = 1
while tries < 3:
Please register or sign in to reply
try:
rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose)
failed = 0
break
except (URLError, HTTPError) as e:
logging.warning(f'Caught error while running calc rank pdfs: {e}.')
tries += 1
    • Comment on lines +102 to +109

      maybe @rebecca.ewing can clarify, but I believe the way this retry is implemented now is to go through sane bins as quickly as possible and come back to bad bins at the end, instead of potentially being stuck for some bad bins at the beginning (e.g. first 100 bins might take 3 retries). This way might save time as leaving those bad bins to the end might give those bins some time to recover connection.

Please register or sign in to reply
if not failed:
lr_rankingstat = rankingstat.copy()
lr_rankingstat.finish()
rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose)
return 1, rankingstatpdf
Please register or sign in to reply
return 1, rankingstatpdf
else:
return 0, None
def url_from_registry(registry, path):
@@ -128,7 +140,6 @@ def main():
logging.basicConfig(format = '%(asctime)s | marginalize_likelihoods: %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
output = options.output
registries = options.registry
failed = deque(maxlen = len(registries))
@@ -137,6 +148,14 @@ def main():
# get 10 million samples
ranking_stat_samples = int(10000000 / len(registries))
#
# set up the output paths
#
svd_bins = [reg[:4] for reg in registries]
pdfs = DataCache.generate(DataType.DIST_STAT_PDFS, CacheEntry.from_T050017(options.output).observatory, svd_bins = svd_bins)
pdfs = pdfs.groupby('svd_bin')
#
# paths to data objects on each job's web management interface
#
@@ -185,45 +204,73 @@ def main():
logging.info(f"Querying registry {reg}...")
url = url_from_registry(reg, likelihood_path)
# load ranking stat pdf and marginalize as we go
status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
if data:
data += pdf
else:
data = pdf
svd_bin = reg[:4]
if os.path.isfile(pdfs[svd_bin].files[0]):
# load the old ranking stat pdf for this bin:
_, old_pdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(pdfs[svd_bin].files[0], verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler))
else:
logging.warning(f"Couldn't find {pdfs[svd_bin].files[0]}, starting from scratch")
old_pdf = None
# create the new ranking stat pdf and marginalize as we go
new_pdf_status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
add_to_data = 0
if new_pdf_status and old_pdf:
pdf += old_pdf
add_to_data = 1
elif new_pdf_status and not old_pdf:
add_to_data = 1
elif not new_pdf_status and old_pdf:
pdf = old_pdf
add_to_data = 1
failed.append(reg)
else:
failed.append(reg)
# while looping through registries
# send heartbeat messages
if kafka_processor:
kafka_processor.heartbeat()
# retry registries that we failed to process the first time
# give each registry a maximum of 3 retries, and remove from
# the deque upon success
retry = 1
while retry <= 3 and failed:
for reg in list(failed):
url = url_from_registry(reg, likelihood_path)
# load ranking stat pdf and marginalize as we go
status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose)
if status:
logging.info(f"completed {reg} on retry: {retry}")
failed.remove(reg)
if add_to_data:
# make sure the zerolag in the pdf is empty
pdf.zero_lag_lr_lnpdf.count.array[:] = 0.
if new_pdf_status:
# save the new PDF + old PDF (if it exists) to disk before extinction
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, pdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, pdfs[svd_bin].files[0], verbose = options.verbose, trap_signals = None)
# get the zerolag pdf for this bin and use it to perform bin-specific extinction
zerolag_counts_url = url_from_registry(reg, zerolag_counts_path)
pdf += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
if pdf.ready_for_extinction():
# LR calculation has started and we are ready to perform first-round extinction
if data:
data += pdf.new_with_extinction()
else:
data = pdf.new_with_extinction()
else:
# add a zeroed-out PDF instead, so that the template ids get added to data
logging.warning(f'Skipping first-round extinction for {pdfs[svd_bin].files[0]}, using an empty PDF instead')
pdf.noise_lr_lnpdf.array[:] = 0.
pdf.signal_lr_lnpdf.array[:] = 0.
pdf.zero_lag_lr_lnpdf.array[:] = 0.
if data:
data += pdf
else:
data = pdf
else:
logging.info(f"failed to complete {reg} on retry: {retry}")
if kafka_processor:
kafka_processor.heartbeat()
retry += 1
Please register or sign in to reply
# while looping through registries
# send heartbeat messages
if kafka_processor:
kafka_processor.heartbeat()
# zero out the zerolag after the first round of extinction is finished
if data:
data.zero_lag_lr_lnpdf.count.array[:] = 0
# if we fail to complete more than 1% of the bins,
# this is a serious problem and we should just quit
@@ -249,40 +296,14 @@ def main():
zerolag_counts_url = url_from_registry("gstlal_ll_inspiral_trigger_counter_registry.txt", zerolag_counts_path)
# add zerolag counts url to marginalized data
data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
if data:
data += far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
else:
data = far.RankingStatPDF.from_xml(ligolw_utils.load_url(zerolag_counts_url, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler), u"gstlal_inspiral_likelihood")
if kafka_processor:
kafka_processor.heartbeat()
# NOTE comment this to unmix in previous samples
if os.path.isfile(options.output):
prev_output, prevoutput_path = tempfile.mkstemp(".xml.gz", dir=os.getenv("_CONDOR_SCRATCH_DIR", tempfile.gettempdir()))
logging.info(f'Copying {options.output} to {prevoutput_path}')
shutil.copy(options.output, prevoutput_path)
_, zlpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(prevoutput_path, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler))
# Zero it out
zlpdf.zero_lag_lr_lnpdf.count.array[:] = 0.
# write out the file to disk
xmldoc = lw.ligolw.Document()
xmldoc.appendChild(lw.ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], paramdict = {})
far.gen_likelihood_control_doc(xmldoc, None, zlpdf)
process.set_end_time_now()
ligolw_utils.write_url(xmldoc, prevoutput_path, verbose = options.verbose, trap_signals = None)
# add previous output to marginalized data
data += far.RankingStatPDF.from_xml(xmldoc, u"gstlal_inspiral_likelihood")
if kafka_processor:
kafka_processor.heartbeat()
else:
prevoutput_path=""
logging.info(f"Previous output: {prevoutput_path}")
# apply density estimation and normalize the PDF
data.density_estimate_zero_lag_rates()
Loading