Commit a51250d5 authored by Qi Chu's avatar Qi Chu
Browse files

remove a delay for EW pipelines caused by clustering

parent c76f1627
......@@ -274,6 +274,11 @@ def parse_command_line():
raise ValueError("only coherent searches are supported: must process data from at least two antennae")
# Get the banks and make the detectors
# format of iir_banks : [{'H1': <H1Bank0>; 'L1': <L1Bank0>..;}
# {'H1': <H1Bank1>; 'L1': <L1Bank1>..;}
# ...]
iir_banks = [spiir_utils.parse_iirbank_string(iirbank) for iirbank in options.iir_bank]
# FIXME: should also check for read permissions
......@@ -467,6 +472,11 @@ postcohsrcs = spiirparts.mkPostcohSPIIROnline(
cohfar_assignfar_input_fname = options.cohfar_assignfar_input_fname
# or "%s-%s_Postcoh-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
)
# get the negative_latency value from the first bank of the first detector
# the iir_banks structure has been checked in the mkPostcohSPIIROnline function
bank_dict0 = iir_banks[0]
this_bankname = bank_dict0.get(bank_dict0.keys()[0])[0]
neg_lat = spiir_utils.get_negative_from_xml(this_bankname)
finalsink = postcoh_finalsink.FinalSink(
channel_dict = detectors.channel_dict,
......@@ -495,6 +505,7 @@ finalsink = postcoh_finalsink.FinalSink(
gracedb_service_url = options.finalsink_gracedb_service_url,
gracedb_offline_annote = options.finalsink_gracedb_offline_annote,
output_skymap = options.cuda_postcoh_output_skymap,
negative_latency = neg_lat,
verbose = options.verbose)
......
......@@ -326,7 +326,7 @@ class FAPUpdater(object):
collected_fnames.append("%s/%s" % (self.path, one_bank_fname))
class FinalSink(object):
def __init__(self, channel_dict, process_params, pipeline, need_online_perform, path, output_prefix, output_name, far_factor, cluster_window = 0.5, snapshot_interval = None, fapupdater_interval = None, cohfar_accumbackground_output_prefix = None, cohfar_accumbackground_output_name = None, fapupdater_output_fname = None, fapupdater_collect_walltime_string = None, singlefar_veto_thresh = 0.01, chisq_ratio_veto_thresh = 8.0, gracedb_far_threshold = None, gracedb_group = "Test", gracedb_search = "LowMass", gracedb_pipeline = "spiir", gracedb_service_url = "https://gracedb.ligo.org/api/", gracedb_offline_annote = None, output_skymap = 0, superevent_thresh = 3.8e-7, opa_cohsnr_thresh = 8, verbose = False):
def __init__(self, channel_dict, process_params, pipeline, need_online_perform, path, output_prefix, output_name, far_factor, cluster_window = 0.5, snapshot_interval = None, fapupdater_interval = None, cohfar_accumbackground_output_prefix = None, cohfar_accumbackground_output_name = None, fapupdater_output_fname = None, fapupdater_collect_walltime_string = None, singlefar_veto_thresh = 0.01, chisq_ratio_veto_thresh = 8.0, gracedb_far_threshold = None, gracedb_group = "Test", gracedb_search = "LowMass", gracedb_pipeline = "spiir", gracedb_service_url = "https://gracedb.ligo.org/api/", gracedb_offline_annote = None, output_skymap = 0, superevent_thresh = 3.8e-7, opa_cohsnr_thresh = 8, negative_latency = 0, verbose = False):
#
# initialize
#
......@@ -342,6 +342,7 @@ class FinalSink(object):
self.cluster_window = cluster_window
self.candidate = None
self.cluster_boundary = None
self.negative_latency = negative_latency
self.need_candidate_check = False
self.cur_event_table = lsctables.New(postcoh_table_def.PostcohInspiralTable)
self.chisq_ratio_thresh = chisq_ratio_veto_thresh
......@@ -485,8 +486,9 @@ class FinalSink(object):
self.t_start = buf_timestamp
self.is_first_buf = False
headevent_endtime = buf_timestamp + self.negative_latency
if self.is_first_event and nevent > 0:
self.cluster_boundary = buf_timestamp + self.cluster_window
self.cluster_boundary = headevent_endtime + self.cluster_window
self.is_first_event = False
# extend newevents to cur_event_table
......@@ -498,9 +500,9 @@ class FinalSink(object):
# NOTE: only consider clustered trigger for uploading to gracedb
# check if the newevents is over boundary
# this loop will exit when the cluster_boundary is incremented to be > the buf_timestamp, see plot in self.cluster()
# this loop will exit when the cluster_boundary is incremented to be > the headevent_endtime, see diagram in self.cluster()
while self.cluster_window > 0 and self.cluster_boundary and buf_timestamp > self.cluster_boundary:
while self.cluster_window > 0 and self.cluster_boundary and headevent_endtime > self.cluster_boundary:
self.cluster(self.cluster_window)
if self.need_candidate_check:
......@@ -546,14 +548,14 @@ class FinalSink(object):
def cluster(self, cluster_window):
# send candidate to be gracedb checked only when:
# time ->->->->
# |buf_timestamp
# timestamp small ->->->-> large
# |headevent_endtime
# ___________(cur_table)
# |boundary
# |candidate to be gracedb checked = peak of cur_table < boundary
# |candidate remain = peak of cur_table > boundary
# |candidate to be gracedb checked = end time of the peak of cur_table < boundary
# |candidate remain = end time of the peak of cur_table > boundary
# afterwards:
# |buf_timestamp
# |headevent_endtime
# ____(cur_table cleaned)
# |boundary incremented
......@@ -567,8 +569,10 @@ class FinalSink(object):
self.candidate = None # so we can reselect a candidate next time
return
# the first event in cur_event_table
# FIXME: SPEEDUP
peak_event = self.__select_head_event()
# find the max cohsnr event within the boundary of cur_event_table
# FIXME: SPEEDUP
for row in self.cur_event_table:
if row.end <= self.cluster_boundary and row.cohsnr > peak_event.cohsnr:
peak_event = row
......@@ -581,12 +585,15 @@ class FinalSink(object):
return
if peak_event.end <= self.cluster_boundary and peak_event.cohsnr > self.candidate.cohsnr:
# if peak_event.cohsnr > candidate.cohsnr, slide window so the centre
# becomes the peak_event
self.candidate = peak_event
iterutils.inplace_filter(lambda row: row.end > self.cluster_boundary, self.cur_event_table)
# update boundary
self.cluster_boundary = self.candidate.end + cluster_window
self.need_candidate_check = False
else:
else:
# if peak_event.cohsnr < candidate.cohsnr, pop out candidate for gracedb uploading
iterutils.inplace_filter(lambda row: row.end > self.cluster_boundary, self.cur_event_table)
# update boundary
self.cluster_boundary = self.cluster_boundary + cluster_window
......
......@@ -1034,6 +1034,7 @@ class Bank(object):
self.matches = []
self.flower = None
self.epsilon = None
self.negative_latency = 0
def build_from_tmpltbank(self, filename, sampleRate = None, negative_latency = 0, padding = 1.3, approximant = 'SpinTaylorT4', waveform_domain = "FD", epsilon_start = 0.02, epsilon_min = 0.001, epsilon_max = None, epsilon_factor = 2, filters_min = 0, filters_max = None, filters_per_loglen_min = 0, filters_per_loglen_max = None, initial_overlap_min = 0, b0_optimized_overlap_min = 0, final_overlap_min = 0, initial_overlap_max = 1, b0_optimized_overlap_max = 1, final_overlap_max = 1, nround_max = 10, alpha = .99, beta = 0.25, pnorder = 4, flower = 15, snr_cut = 0.998, all_psd = None, autocorrelation_length = 201, downsample = False, optimizer_options = {}, verbose = False, contenthandler = DefaultContentHandler):
"""
......@@ -1054,7 +1055,7 @@ class Bank(object):
self.epsilon = epsilon_start
self.alpha = alpha
self.beta = beta
self.negative_latency = negative_latency
if sampleRate is None:
fFinal = max(sngl_inspiral_table.getColumnByName("f_final"))
......@@ -1414,6 +1415,7 @@ class Bank(object):
root.appendChild(param.Param.build('epsilon', types.FromPyType[float], self.epsilon))
root.appendChild(param.Param.build('alpha', types.FromPyType[float], self.alpha))
root.appendChild(param.Param.build('beta', types.FromPyType[float], self.beta))
root.appendChild(param.Param.build('negative_latency', types.FromPyType[int], self.negative_latency))
# FIXME: ligolw format now supports complex-valued data
root.appendChild(array.Array.build('autocorrelation_bank_real', self.autocorrelation_bank.real))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment