Commit c10011bd authored by Surabhi Sachdev's avatar Surabhi Sachdev Committed by Patrick Godwin
Browse files

Adding pre merger time in process params table

parent 35035ce9
......@@ -284,6 +284,7 @@ def parse_command_line():
group.add_option("--gracedb-label", action = "append", help = "Labels to apply to gracedb uploads. Can be applied multiple times.")
group.add_option("--gracedb-service-url", metavar = "url", default = gracedb_default_service_url, help = "Override default GracedB service url (optional, default is %s)." % gracedb_default_service_url)
group.add_option("--delay-uploads", action = "store_true", help = "Choose whether to delay uploads to allow aggregation of events downstream from many gstlal_inspiral jobs (default is False).")
group.add_option("--upload-time-before-merger", action = "store_true", help = "If stored in SVD banks, option to upload the time before merger at which each filter terminates in process params table")
parser.add_option_group(group)
group = OptionGroup(parser, "Program Behaviour")
......@@ -758,6 +759,9 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
else:
reconstruction_segment_list = None
if options.upload_time_before_merger is not None:
# Create template id / time before merger map
options.upload_time_before_merger = {row.template_id: abs(row.end.gpsSeconds) + 1e-9*abs(row.end.gpsNanoSeconds) for bank in banks.items()[0][-1] for row in bank.sngl_inspiral_table}
@bottle.route("/bank.txt")
def get_filter_length_and_chirpmass(banks = banks):
......@@ -892,8 +896,9 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
cap_singles = options.cap_singles,
FAR_trialsfactor = options.far_trials_factor,
activation_counts = options.activation_counts,
verbose = options.verbose,
track_latency = options.data_source in ("lvshm", "framexmit"),
template_id_time_map = options.upload_time_before_merger,
verbose = options.verbose
)
if options.verbose:
print("... pipeline handler initialized", file=sys.stderr)
......
......@@ -745,7 +745,7 @@ class Handler(simplehandler.Handler):
dumps of segment information, trigger files and background
distribution statistics.
"""
def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, sngls_snr_threshold = None, tag = "", kafka_server = "10.14.0.112:9092", cluster = False, cap_singles = False, FAR_trialsfactor = 1.0, activation_counts = None, track_latency = False, verbose = False):
def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, sngls_snr_threshold = None, tag = "", kafka_server = "10.14.0.112:9092", cluster = False, cap_singles = False, FAR_trialsfactor = 1.0, activation_counts = None, track_latency = False, template_id_time_map = None, verbose = False):
"""!
@param mainloop The main application's event loop
@param pipeline The gstreamer pipeline that is being
......@@ -773,6 +773,7 @@ class Handler(simplehandler.Handler):
self.cap_singles = cap_singles
self.FAR_trialsfactor = FAR_trialsfactor
self.activation_counts = activation_counts
self.template_id_time_map = template_id_time_map
self.gracedbwrapper = gracedbwrapper
# FIXME: detangle this
......@@ -1238,7 +1239,7 @@ class Handler(simplehandler.Handler):
if not self.stream_thinca.push(instrument, [event for event in events if event.ifo == instrument], buf_timestamp):
continue
flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = self.rankingstat.fast_path_cut_from_triggers, cluster = self.cluster, cap_singles = self.cap_singles, FAR_trialsfactor = self.FAR_trialsfactor)
flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = self.rankingstat.fast_path_cut_from_triggers, cluster = self.cluster, cap_singles = self.cap_singles, FAR_trialsfactor = self.FAR_trialsfactor,template_id_time_map = self.template_id_time_map)
self.coincs_document.commit()
# do GraceDB alerts and update eye candy
......@@ -1463,7 +1464,7 @@ class Handler(simplehandler.Handler):
# whatever triggers remain in the queues, and processes
# them
flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = self.rankingstat.fast_path_cut_from_triggers, flush = True, cluster = self.cluster, cap_singles = self.cap_singles, FAR_trialsfactor = self.FAR_trialsfactor)
flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = self.rankingstat.fast_path_cut_from_triggers, flush = True, cluster = self.cluster, cap_singles = self.cap_singles, FAR_trialsfactor = self.FAR_trialsfactor, template_id_time_map = self.template_id_time_map)
self.coincs_document.commit()
# do GraceDB alerts
......
......@@ -234,6 +234,7 @@ class StreamThinca(object):
def set_xmldoc(self, xmldoc, process_id):
self.coinc_tables = thinca.InspiralCoincTables(xmldoc, thinca.InspiralCoincDef)
self.sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc)
self.process_params_table = lsctables.ProcessParamsTable.get_table(xmldoc)
self.last_coincs = last_coincs(xmldoc)
self.process_id = process_id
self.time_slide_graph = snglcoinc.TimeSlideGraph(
......@@ -255,7 +256,7 @@ class StreamThinca(object):
return self.time_slide_graph.push(instrument, events, t_complete)
def pull(self, rankingstat, fapfar = None, zerolag_rankingstatpdf = None, coinc_sieve = None, flush = False, cluster = False, cap_singles = False, FAR_trialsfactor = 1.0):
def pull(self, rankingstat, fapfar = None, zerolag_rankingstatpdf = None, coinc_sieve = None, flush = False, cluster = False, cap_singles = False, FAR_trialsfactor = 1.0, template_id_time_map = None):
# NOTE: rankingstat is not used to compute the ranking
# statistic, it supplies the detector livetime segment
# lists to determine which triggers are eligible for
......@@ -344,7 +345,11 @@ class StreamThinca(object):
self.last_coincs.add(events, coinc, coincmaps, coinc_inspiral)
self.sngl_inspiral_table.extend([sngl_trigger for sngl_trigger in events if sngl_trigger.event_id not in self.clustered_sngl_ids])
self.clustered_sngl_ids |= set(e.event_id for e in events)
if template_id_time_map is not None:
# The same template should have the same offset regardless of ifo, so just take the first one
offset = [template_id_time_map[int(sngl_trigger.Gamma0)] for sngl_trigger in events][0]
row = [row for row in self.process_params_table if row.param == u'--upload-time-before-merger'][0]
row.value=str(offset)
# add selected singles to the noise model
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment