diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral index 60fcd6a662674b9e572dcfad732476494c891067..7a7de4ef78a60248b2086c3274080ac1aca608b6 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral +++ b/gstlal-inspiral/bin/gstlal_inspiral @@ -283,7 +283,6 @@ def parse_command_line(): group.add_option("--job-tag", metavar = "tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc.; may not contain \".\" nor \"-\".") group.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data, performs downsampling, and stores to local filespace. ") group.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.") - group.add_option("--thinca-interval", metavar = "seconds", type = "float", default = 30.0, help = "Set the thinca interval (default = 30 s).") group.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.") group.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).") group.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.") @@ -823,7 +822,6 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, rankingstatpdf_url = options.ranking_stat_pdf, zerolag_rankingstatpdf_url = zerolag_rankingstat_pdf, likelihood_snapshot_interval = options.likelihood_snapshot_interval, - thinca_interval = options.thinca_interval, min_log_L = options.min_log_L, sngls_snr_threshold = options.singles_threshold, tag = options.job_tag, diff --git a/gstlal-inspiral/python/far.py b/gstlal-inspiral/python/far.py index a1fa2f6cca5b104abf580028a8b10a826961ff2b..32283bae0f52f15b82a9530586717c87c7781b70 100644 --- a/gstlal-inspiral/python/far.py +++ b/gstlal-inspiral/python/far.py @@ -136,17 +136,17 @@ class RankingStat(snglcoinc.LnLikelihoodRatioMixin): self.denominator = inspiral_lr.LnNoiseDensity(template_ids = template_ids, instruments = instruments, delta_t = delta_t, min_instruments = min_instruments) self.zerolag = inspiral_lr.LnLRDensity(template_ids = template_ids, instruments = instruments, delta_t = delta_t, min_instruments = min_instruments) -# def __call__(self, **kwargs): -# # FIXME FIXME FIXME This fast path should be reenabled when we -# # know it scales well. It is currently implemented in the -# # streamthinca ntuple compare func -# # fast-path: network SNR cut -# #if sum(snr**2. for snr in kwargs["snrs"].values()) < self.network_snrsq_threshold: -# # return NegInf -# # full ln L ranking stat. we define the ranking statistic -# # to be the largest ln L from all allowed subsets of -# # triggers -# return max(super(RankingStat, self).__call__(**kwargs) for kwargs in kwarggen(min_instruments = self.min_instruments, **kwargs)) + def __call__(self, **kwargs): + # fast-path: network SNR cut + if sum(snr**2. for snr in kwargs["snrs"].values()) < self.network_snrsq_threshold: + return NegInf + # full ln L ranking stat. we define the ranking statistic + # to be the largest ln L from all allowed subsets of + # triggers + # FIXME: temporarily disabled due to performance concerns. + # just chain to parent class + return super(RankingStat, self).__call__(**kwargs) + #return max(super(RankingStat, self).__call__(**kwargs) for kwargs in kwarggen(min_instruments = self.min_instruments, **kwargs)) @property def template_ids(self): diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py index 8d7317d7391d17bffefcacf35574956184d37353..c94f89fbcdf5bf16576b1105d413db60f263b808 100644 --- a/gstlal-inspiral/python/lloidhandler.py +++ b/gstlal-inspiral/python/lloidhandler.py @@ -46,7 +46,6 @@ # -import bisect from collections import deque try: from fpconst import NaN @@ -120,18 +119,6 @@ def subdir_from_T050017_filename(fname): return path -def lower_bound_in_seglist(seglist, x): - """ - Return the index of the segment immediately at or before x in the - coalesced segment list seglist. Operation is O(log n). - """ - # NOTE: this is an operation that is performed in a number of - # locations in various codes, and is something that I've screwed up - # more than once. maybe this should be put into segments.py itself - i = bisect.bisect_right(seglist, x) - return i - 1 if i else 0 - - # # ============================================================================= # @@ -587,7 +574,7 @@ class Handler(simplehandler.Handler): dumps of segment information, trigger files and background distribution statistics. """ - def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, thinca_interval = 50.0, min_log_L = None, sngls_snr_threshold = None, tag = "", kafka_server = "10.14.0.112:9092", verbose = False): + def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, min_log_L = None, sngls_snr_threshold = None, tag = "", kafka_server = "10.14.0.112:9092", verbose = False): """! @param mainloop The main application's event loop @param pipeline The gstreamer pipeline that is being @@ -638,20 +625,14 @@ class Handler(simplehandler.Handler): bottle.route("/sngls_snr_threshold.txt", method = "POST")(self.web_set_sngls_snr_threshold) bottle.route("/zerolag_rankingstatpdf.xml")(self.web_get_zerolag_rankingstatpdf) - # - # all triggers up to this index have had their SNR time - # series deleted to save memory - # - - self.snr_time_series_cleanup_index = 0 - # # attach a StreamThinca instance to ourselves # self.stream_thinca = streamthinca.StreamThinca( - coincidence_threshold = rankingstat.delta_t, - thinca_interval = thinca_interval, # seconds + coincs_document.xmldoc, + coincs_document.process_id, + delta_t = rankingstat.delta_t, min_instruments = rankingstat.min_instruments, min_log_L = min_log_L, sngls_snr_threshold = sngls_snr_threshold @@ -696,24 +677,24 @@ class Handler(simplehandler.Handler): if self.ranking_stat_input_url is not None: if self.rankingstat.is_healthy(self.verbose): - self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish() + self.stream_thinca.ln_lr_from_triggers = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish().ln_lr_from_triggers if self.verbose: print >>sys.stderr, "ranking statistic assignment ENABLED" else: - del self.stream_thinca.rankingstat + self.stream_thinca.ln_lr_from_triggers = None if self.verbose: print >>sys.stderr, "ranking statistic assignment DISABLED" elif min_log_L is not None: - self.stream_thinca.rankingstat = far.DatalessRankingStat( + self.stream_thinca.ln_lr_from_triggers = far.DatalessRankingStat( template_ids = rankingstat.template_ids, instruments = rankingstat.instruments, min_instruments = rankingstat.min_instruments, delta_t = rankingstat.delta_t - ).finish() + ).finish().ln_lr_from_triggers if self.verbose: print >>sys.stderr, "ranking statistic assignment ENABLED" else: - del self.stream_thinca.rankingstat + self.stream_thinca.ln_lr_from_triggers = None if self.verbose: print >>sys.stderr, "ranking statistic assignment DISABLED" @@ -779,6 +760,17 @@ class Handler(simplehandler.Handler): name = "%s_ht_gate" % instrument elem = pipeline.get_by_name(name) if elem is None: + # FIXME: if there is no data for an + # instrument for which we have ranking + # statistic data then the horizon distance + # record needs to indicate that it was off + # for the entire segment. should probably + # 0 the horizon distance history at start + # and stop of each stream, but there is a + # statement in the EOS handler that we + # don't do that, and I can remember why + # that is. + continue raise ValueError("cannot find \"%s\" element for %s" % (name, instrument)) if verbose: print >>sys.stderr, "\tfound %s for %s" % (name, instrument) @@ -1002,11 +994,11 @@ class Handler(simplehandler.Handler): # update streamthinca's ranking statistic # data if self.rankingstat.is_healthy(self.verbose): - self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish() + self.stream_thinca.ln_lr_from_triggers = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish().ln_lr_from_triggers if self.verbose: print >>sys.stderr, "ranking statistic assignment ENABLED" else: - del self.stream_thinca.rankingstat + self.stream_thinca.ln_lr_from_triggers = None if self.verbose: print >>sys.stderr, "ranking statistic assignment DISABLED" @@ -1035,102 +1027,21 @@ class Handler(simplehandler.Handler): snr_min = self.rankingstat.snr_min self.rankingstat.denominator.triggerrates[instrument].add_ratebin(map(float, buf_seg), len([event for event in events if event.snr >= snr_min])) - # extract times when instruments were producing - # SNR. used to define "on instruments" for coinc - # tables among other things. will only need - # segment information for the times for which we - # have triggers, so use stream_thinca's discard - # boundary and a bisection search to clip the lists - # to reduce subsequent operation count. - discard_boundary = float(self.stream_thinca.discard_boundary) - snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items()) - - # times when SNR was available. used only for code - # correctness checks - one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1) - # FIXME: this is needed to work around rounding - # problems in safety checks below, trying to - # compare GPS trigger times to float segment - # boundaries (the boundaries don't have enough - # precision to know if triggers near the edge are - # in or out). it would be better not to have to - # screw around like this. - one_or_more_instruments.protract(1e-3) # 1 ms - - # times when at least 2 instruments were generating - # SNR. used to sieve triggers for inclusion in the - # denominator. - two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2) - # FIXME: see comment above. - two_or_more_instruments.protract(1e-3) # 1 ms - - # run stream thinca. update the ranking - # statistic's denominator's histograms from sngls - # that weren't used in zero-lag multi-instrument - # coincs. NOTE: we rely on the arguments to - # .chain() being evaluated in left-to-right order - # so that .add_events() is evaluated before - # .last_coincs because the former initializes the - # latter. we skip singles collected during times - # when only one instrument was on. NOTE: the - # definition of "on" is blurry since we can recover - # triggers with end times outside of the available - # strain data, but the purpose of this test is - # simply to prevent signals occuring during - # single-detector times from contaminating our - # noise model, so it's not necessary for this test - # to be super precisely defined. - for event in itertools.chain(self.stream_thinca.add_events(self.coincs_document.xmldoc, self.coincs_document.process_id, events, buf_timestamp, snr_segments, fapfar = self.fapfar), self.stream_thinca.last_coincs.single_sngl_inspirals() if self.stream_thinca.last_coincs else ()): - if self.ranking_stat_output_url is None: - continue - assert event.end in one_or_more_instruments, "trigger at time (%s) when there was no SNR (%s)" % (str(event.end), str(one_or_more_instruments)) - if event.end in two_or_more_instruments: - self.rankingstat.denominator.increment(event) - self.coincs_document.commit() - - # update zero-lag bin counts in rankingstat. - if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id): - self.rankingstat.zerolag.increment(event) - - # Cluster last coincs before recording number of - # zero lag events or sending alerts to gracedb - # FIXME Do proper clustering that saves states - # between thinca intervals and uses an independent - # clustering window. This can also go wrong if - # there are multiple events with an identical - # likelihood. It will just choose the event with - # the highest event id - if self.stream_thinca.last_coincs: - self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)]) - - # Add events to the observed likelihood histogram - # post "clustering" - # FIXME proper clustering is really needed (see - # above) - if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1 - - # do GraceDB alerts and update eye candy - self.__do_gracedb_alerts() - self.eye_candy.update(events, self.stream_thinca.last_coincs) - - # after doing alerts, no longer need per-trigger - # SNR data for the triggers that are too old to be - # used to form candidates. to avoid searching the - # entire list of triggers each time, we stop when - # we encounter the first trigger whose SNR series - # might still be needed, save its index, and start - # the search from there next time - discard_boundary = self.stream_thinca.discard_boundary - for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index): - if event.end >= discard_boundary: - break - del event.snr_time_series + # run stream thinca. + if self.stream_thinca.push(instrument, events, buf_timestamp): + flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = lambda events: sum(event.snr**2. for event in events) >= self.rankingstat.network_snrsq_threshold) + self.coincs_document.commit() + + # do GraceDB alerts and update eye candy + self.__do_gracedb_alerts(self.stream_thinca.last_coincs) + self.eye_candy.update(events, self.stream_thinca.last_coincs) + + # after doing alerts, no longer need + # per-trigger SNR data for the triggers + # that are too old to be used to form + # candidates. + for event in flushed_sngls: + del event.snr_time_series def _record_horizon_distance(self, instrument, timestamp, horizon_distance): @@ -1310,78 +1221,24 @@ class Handler(simplehandler.Handler): def __flush(self): - # run StreamThinca's .flush(). returns the last remaining - # non-coincident sngls. add them to the distribution. as - # above in appsink_new_buffer() we skip singles collected - # during times when only one instrument was one. - - # times when at least 2 instruments were generating SNR. - # used to sieve triggers for inclusion in the denominator. - discard_boundary = float(self.stream_thinca.discard_boundary) - snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items()) - one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1) - # FIXME: see comment in appsink_new_buffer() - one_or_more_instruments.protract(1e-3) # 1 ms - two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2) - # FIXME: see comment in appsink_new_buffer() - two_or_more_instruments.protract(1e-3) # 1 ms - - for event in self.stream_thinca.flush(self.coincs_document.xmldoc, self.coincs_document.process_id, snr_segments, fapfar = self.fapfar): - if self.ranking_stat_output_url is None: - continue - assert event.end in one_or_more_instruments, "trigger at time (%s) with no SNR (%s)" % (str(event.end), str(one_or_more_instruments)) - if event.end in two_or_more_instruments: - self.rankingstat.denominator.increment(event) - self.coincs_document.commit() + # run StreamThinca in flush mode. forms candidates from + # whatever triggers remain in the queues, and processes + # them - # update zero-lag bin counts in rankingstat. - if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id): - self.rankingstat.zerolag.increment(event) - - # Cluster last coincs before recording number of zero - # lag events or sending alerts to gracedb - # FIXME Do proper clustering that saves states between - # thinca intervals and uses an independent clustering - # window. This can also go wrong if there are multiple - # events with an identical likelihood. It will just - # choose the event with the highest event id - if self.stream_thinca.last_coincs: - self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)]) - - # Add events to the observed likelihood histogram post - # "clustering" - # FIXME proper clustering is really needed (see above) - if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1 + flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, flush = True) + self.coincs_document.commit() # do GraceDB alerts - self.__do_gracedb_alerts() + self.__do_gracedb_alerts(self.stream_thinca.last_coincs) # after doing alerts, no longer need per-trigger SNR data # for the triggers that are too old to be used to form - # candidates. to avoid searching the entire list of - # triggers each time, we stop when we encounter the first - # trigger whose SNR series might still be needed, save its - # index, and start the search from there next time - discard_boundary = self.stream_thinca.discard_boundary - for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index): - if event.end >= discard_boundary: - break + # candidates. + for event in flushed_sngls: del event.snr_time_series - # garbage collect last_coincs - # FIXME: this should not be needed. something is wrong. - # if this is needed, then why don't we have to garbage - # collect everything ourselves? - self.stream_thinca.last_coincs = {} - - def __do_gracedb_alerts(self): + def __do_gracedb_alerts(self, last_coincs): # check for no-op if self.rankingstatpdf is None or not self.rankingstatpdf.is_healthy(): return @@ -1392,7 +1249,7 @@ class Handler(simplehandler.Handler): assert self.fapfar is not None # do alerts - self.gracedbwrapper.do_alerts(self.stream_thinca.last_coincs, self.psds, self.__get_rankingstat_xmldoc) + self.gracedbwrapper.do_alerts(last_coincs, self.psds, self.__get_rankingstat_xmldoc) def web_get_sngls_snr_threshold(self): @@ -1454,4 +1311,10 @@ class Handler(simplehandler.Handler): coincs_document = self.coincs_document.get_another() self.__write_output_url(url = fname, verbose = verbose) self.coincs_document = coincs_document - self.snr_time_series_cleanup_index = 0 + # NOTE: this operation requires stream_thinca to + # have been flushed by calling .pull() with flush = + # True. the .__write_output_url() code path does + # that, but there is no check here to ensure that + # remains true so be careful if you are editing + # these methods. + self.stream_thinca.set_xmldoc(self.coincs_document.xmldoc, self.coincs_document.process_id) diff --git a/gstlal-inspiral/python/streamthinca.py b/gstlal-inspiral/python/streamthinca.py index 7648574a8fbe088e989855f6e6c55b8ad415c01b..de24105382a9638bc2f185326a085edfe5b14a7b 100644 --- a/gstlal-inspiral/python/streamthinca.py +++ b/gstlal-inspiral/python/streamthinca.py @@ -41,15 +41,17 @@ # -import itertools +import bisect +import time -from glue import iterutils -from ligo import segments from glue.ligolw import lsctables +from gstlal import snglinspiraltable import lal +from lalburst import snglcoinc from lalinspiral import thinca -from gstlal import snglinspiraltable +from ligo import segments +from ligo.segments import utils as segmentsUtils # @@ -77,296 +79,326 @@ class SnglInspiral(snglinspiraltable.GSTLALSnglInspiral): # # ============================================================================= # -# StreamThinca +# last_coincs helper # # ============================================================================= # # -# on-the-fly thinca implementation +# the last_coincs machine, which is used to construct documents for upload +# to gracedb, used to be implemented by a class in thinca.py which was +# originally written for this purpose. when the coincidence engine +# switched to a native streaming design, that older implementation was no +# longer suitable because it indexes the entire document. we have had to +# replace that implementation with this, here. the old one is now unused, +# but it's potentially useful so it has been left behind and this one put +# here instead. # -class StreamThinca(object): - def __init__(self, coincidence_threshold, thinca_interval = 50.0, min_instruments = 2, min_log_L = None, sngls_snr_threshold = None): - self.thinca_interval = thinca_interval # seconds - self.last_coincs = {} - if min_instruments < 1: - raise ValueError("min_instruments (=%d) must be >= 1" % min_instruments) - self.min_instruments = min_instruments - self.min_log_L = min_log_L - self.sngls_snr_threshold = sngls_snr_threshold - self.sngl_inspiral_table = None - self.ln_likelihood_func = None +class last_coincs(object): + def __init__(self, xmldoc): + # + # find all tables + # - # the \Delta t window not including the light travel time - self.coincidence_threshold = coincidence_threshold + self.process_table = lsctables.ProcessTable.get_table(xmldoc) + self.process_params_table = lsctables.ProcessParamsTable.get_table(xmldoc) + self.sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc) + self.coinc_def_table = lsctables.CoincDefTable.get_table(xmldoc) + self.coinc_event_table = lsctables.CoincTable.get_table(xmldoc) + self.coinc_inspiral_table = lsctables.CoincInspiralTable.get_table(xmldoc) + self.coinc_event_map_table = lsctables.CoincMapTable.get_table(xmldoc) + self.time_slide_table = lsctables.TimeSlideTable.get_table(xmldoc) - # allow .discard_boundary to return meaningful results - # before the first set of triggers are processed - self.coincidence_back_off = self.max_dt + # + # index the process, process params, and time_slide tables + # - # upper boundary of interval spanned by last invocation - self.last_boundary = -segments.infinity() + self.process_index = dict((row.process_id, row) for row in self.process_table) + self.process_params_index = {} + for row in self.process_params_table: + self.process_params_index.setdefault(row.process_id, []).append(row) + self.time_slide_index = {} + for row in self.time_slide_table: + self.time_slide_index.setdefault(row.time_slide_id, []).append(row) - # set of the event ids of triggers currently in ram that - # have already been used in coincidences - self.event_ids = set() + # + # find the sngl_inspiral<-->sngl_inspiral coinc_definer + # + self.coinc_def, = (row for row in self.coinc_def_table if row.search == thinca.InspiralCoincDef.search and row.search_coinc_type == thinca.InspiralCoincDef.search_coinc_type) - def set_rankingstat(self, rankingstat): - if rankingstat is None: - self.ln_likelihood_func = None - else: - self.ln_likelihood_func = rankingstat.ln_lr_from_triggers - def del_rankingstat(self): - self.ln_likelihood_func = None - rankingstat = property(None, set_rankingstat, del_rankingstat, "RankingStat instance with which to compute likelihood ratio values.") + # + # coinc event metadata + # + self.sngl_inspiral_index = {} + self.coinc_event_index = {} + self.coinc_event_maps_index = {} + self.coinc_inspiral_index = {} - @property - def max_dt(self): - """ - Upper bound on the time that can separate two triggers and - they still pass coincidence, not including time shifts. - """ - # add 10% to coincidence window for safety + the - # light-crossing time of the Earth - return 1.1 * self.coincidence_threshold + 2. * lal.REARTH_SI / lal.C_SI + def add(self, events, coinc, coincmaps, coinc_inspiral): + # FIXME: these checks might be over-doing it. I just + # don't want any surprises, but maybe when we are confident + # the code works they should be removed + assert coinc.coinc_def_id == self.coinc_def.coinc_def_id + assert coinc.coinc_event_id == coinc_inspiral.coinc_event_id + assert coinc.process_id in self.process_index + assert coinc.time_slide_id in self.time_slide_index + assert all(event.process_id == coinc.process_id for event in events) + assert all(coinc_event_map.coinc_event_id == coinc.coinc_event_id for coinc_event_map in coincmaps) + assert set(event.event_id for event in events) == set(coinc_event_map.event_id for coinc_event_map in coincmaps) + self.sngl_inspiral_index[coinc.coinc_event_id] = events + self.coinc_event_index[coinc.coinc_event_id] = coinc + self.coinc_event_maps_index[coinc.coinc_event_id] = coincmaps + self.coinc_inspiral_index[coinc.coinc_event_id] = coinc_inspiral - @property - def discard_boundary(self): - """ - After invoking .run_coincidence(), triggers prior to this - time are no longer required. - """ - return self.last_boundary - self.coincidence_back_off + def clear(self): + self.sngl_inspiral_index.clear() + self.coinc_event_index.clear() + self.coinc_event_maps_index.clear() + self.coinc_inspiral_index.clear() - def add_events(self, xmldoc, process_id, events, boundary, seglists, fapfar = None): - """ - NOTE: upon the first invocation of .add_events() some - initialization occurs using xmldoc. no other xmldoc object - may be used on any subsequent invocation until .flush() is - invoked or else the behaviour is undefined. The code used - to have safety checks to monitor for changes to the xmldoc - tree but it was found that that confused the garbage - collector and led to memory leaks, so for now it is left as - an exercise for the calling code to ensure it follows the - rules. - """ - # we need our own copy of the sngl_inspiral table because - # we need a place to store a history of all the triggers, - # and a place we can run coincidence on them. when making - # our copy, we can't use table.new_from_template() because - # we need to ensure we have a Table subclass, not a DBTable - # subclass - if self.sngl_inspiral_table is None: - self.sngl_inspiral_table = lsctables.New(lsctables.SnglInspiralTable, lsctables.SnglInspiralTable.get_table(xmldoc).columnnames) - # How far apart two singles can be and still be - # coincident, including time slide offsets. - offsetvectors = lsctables.TimeSlideTable.get_table(xmldoc).as_dict() - self.coincidence_back_off = max(map(abs, offsetvectors.values())) + self.max_dt - self.zero_lag_time_slide_ids = frozenset(time_slide_id for time_slide_id, offsetvector in offsetvectors.items() if not any(offsetvector.values())) - - # append the new row objects to our sngl_inspiral table - for event in events: - assert event.end >= self.last_boundary, "boundary failure: encountered event preceding previous boundary: %s < %s" % (str(event.end), str(self.last_boundary)) - self.sngl_inspiral_table.append(event) - - # run coincidence, return non-coincident sngls. - return self.run_coincidence(xmldoc, process_id, boundary, seglists, fapfar = fapfar) - - - def run_coincidence(self, xmldoc, process_id, boundary, seglists, fapfar = None): - """ - boundary = the time up-to which the trigger list can be - assumed to be complete. - """ - # - # Notes on time intervals: - # - # ... -------)[----------------------)[----------------- ... - # - # ^ ^ - # | last boundary | boundary - # ^ ^ - # |last_bound-back_off | boundary-back_off - # [----------------------) - # coinc segment (times of earliest single) - # ^ - # | discard all singles up - # to here when done - # - # We know all singles up-to boundary; from boundary and on - # the list might be incomplete. A concidence can involve - # triggers separated by as much as coincidence_back_off - # (including time slide offsets). Therefore, on this - # iteration coincs whose earliest trigger is not later than - # (boundary-coincidence_back_off) are complete; coincs - # whose earliest trigger occurs on or after - # (boundary-coincidence_back_off) might be incomplete (we - # might form new doubles or doubles might be promoted to - # triples, and so on, when we fill in the singles list - # later). - # - # Therefore, if for the purpose of this code we define the - # "time" of a coinc by the time of its earliest single, - # then on this iteration, we will construct all coincs with - # times in [last_boundary-coincidence_back_off, - # boundary-coincidence_back_off). When done, singles that - # precede (boundary-coincidence_back_off), are no longer - # required since all coincs that can involve those triggers - # have been obtained on this iteration. - # - # invalidate the coinc extractor in case all that follows - # is a no-op - self.last_coincs = {} - - # check that we've accumulated thinca_interval seconds, and - # that .add_events() has been called with some events since - # the last flush - if self.last_boundary + self.thinca_interval > boundary or self.sngl_inspiral_table is None: - return [] - - # we need our own copy of this table to figure out what - # triggers have been newly used in coincs. this does not - # store any state across invocations so we create it on the - # fly - coinc_event_map_table = lsctables.New(lsctables.CoincMapTable) - - # replace tables with our versions - real_sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc) - real_coinc_event_map_table = lsctables.CoincMapTable.get_table(xmldoc) - real_sngl_inspiral_table.parentNode.replaceChild(self.sngl_inspiral_table, real_sngl_inspiral_table) - real_coinc_event_map_table.parentNode.replaceChild(coinc_event_map_table, real_coinc_event_map_table) - - # define once-off ntuple_comparefunc() so we can pass the - # coincidence segment in as a default value for the seg - # keyword argument and so that we can cut out single detector - # events with an SNR less than 5. Less than SNR 5 triggers - # will never produce an log LR greater than 4, so we can - # safely discard them. - def ntuple_comparefunc(events, offset_vector, seg = segments.segment(self.last_boundary - self.coincidence_back_off, boundary - self.coincidence_back_off)): - # False/0 = keep, True/non-0 = discard - # FIXME FIXME FIXME. Hardcodes a network SNR threshold - # of 7. This will be removed and a more permanent - # solution will be sought in the likelihood code - if sum(e.snr**2 for e in events) < 49.: - return True - return min(event.end for event in events) not in seg - - # find coincs. - thinca.ligolw_thinca( - xmldoc, - process_id = process_id, - delta_t = self.coincidence_threshold, - ntuple_comparefunc = ntuple_comparefunc, - seglists = seglists, - likelihood_func = self.ln_likelihood_func, - fapfar = fapfar, - min_instruments = self.min_instruments, - min_log_L = self.min_log_L - ) + def sngl_inspirals(self, coinc_event_id): + return self.sngl_inspiral_index[coinc_event_id] - # construct a coinc extractor from the XML document while - # the tree still contains our internal table objects - self.last_coincs = thinca.sngl_inspiral_coincs(xmldoc) - - # put the original table objects back - self.sngl_inspiral_table.parentNode.replaceChild(real_sngl_inspiral_table, self.sngl_inspiral_table) - coinc_event_map_table.parentNode.replaceChild(real_coinc_event_map_table, coinc_event_map_table) - - # copy triggers into real output document - # FIXME: the "min log L" cut is applied inside - # ligolw_thinca, above, and coincs that failed it have had - # their singles put back into the "non-coincident singles" - # pile. this creates a bias in the final ranking statistic - # in that it makes it appear as though singles in the part - # of parameter space where the min log L cut fails occur at - # a higher rate than they really do, making triggers that - # fall outside the region appear to be more rare, and thus - # more statistically significant, than they really are. - # the effect is small because coincidences are rare, it - # shifts the density by about 1%. sometime before O3 we - # should rewrite all of this coincidence machinery with an - # eye to higher performance, and when we do we should be - # sure to get this sort of stuff right. - if coinc_event_map_table: - # figure out the IDs of triggers that have been - # used in coincs for the first time, and update the - # set of IDs of all triggers that have been used in - # coincs - index = dict((row.event_id, row) for row in self.sngl_inspiral_table) - self.event_ids &= set(index) - newids = set(coinc_event_map_table.getColumnByName("event_id")) - self.event_ids - self.event_ids |= newids - - # find multi-instrument zero-lag coinc event IDs. - # to avoid quadratic scaling, we use - # coinc_event_map to identify newly-created coincs; - # these will all be at the end of the coinc_event - # table, so we take rows from the end until one is - # encountered whose ID is not in the - # coinc_event_map table, then bail out - coinc_event_ids = frozenset(coinc_event_map_table.getColumnByName("coinc_event_id")) - zero_lag_multi_instrument_coinc_event_ids = frozenset(row.coinc_event_id for row in itertools.takewhile(lambda row: row.coinc_event_id in coinc_event_ids, reversed(lsctables.CoincTable.get_table(xmldoc))) if row.nevents >= 2 and row.time_slide_id in self.zero_lag_time_slide_ids) - - # singles used in coincs but not in zero-lag coincs - # with two or more instruments. these will be added to - # the "non-coincident singles" list before returning - background_coinc_sngl_ids = set(coinc_event_map_table.getColumnByName("event_id")) - set(row.event_id for row in coinc_event_map_table if row.coinc_event_id in zero_lag_multi_instrument_coinc_event_ids) - background_coinc_sngls = map(index.__getitem__, background_coinc_sngl_ids) - - # copy rows into target tables. - for event_id in newids: - real_sngl_inspiral_table.append(index[event_id]) - map(real_coinc_event_map_table.append, coinc_event_map_table) - else: - background_coinc_sngls = [] - - # record boundary - self.last_boundary = boundary - - # remove triggers that are too old to be useful from our - # internal sngl_inspiral table. save any that were never - # used in coincidences - discard_boundary = self.discard_boundary - noncoinc_sngls = [row for row in self.sngl_inspiral_table if row.end < discard_boundary and row.event_id not in self.event_ids] - iterutils.inplace_filter(lambda row: row.end >= discard_boundary, self.sngl_inspiral_table) - - # save all sngls above the requested sngls SNR threshold - # (all sngls that participated in coincs are already in the - # document, so only need to check for ones in the - # non-coincident sngls list for this iteration) - if self.sngls_snr_threshold is not None: - for event in noncoinc_sngls: - if event.snr >= self.sngls_snr_threshold: - real_sngl_inspiral_table.append(event) - # return sngls that were not used in multi-instrument - # zero-lag coincidences - return noncoinc_sngls + background_coinc_sngls + def __iter__(self): + return iter(self.coinc_event_index) - def flush(self, xmldoc, process_id, seglists, fapfar = None): - # coincidence. - noncoinc_sngls = self.run_coincidence(xmldoc, process_id, segments.infinity(), seglists, fapfar = fapfar) + def __nonzero__(self): + return bool(self.coinc_event_index) - # any event that hasn't been used in a coinc by now will - # never be - if self.sngl_inspiral_table is not None: - noncoinc_sngls.extend(row for row in self.sngl_inspiral_table if row.event_id not in self.event_ids) - self.sngl_inspiral_table.unlink() - self.sngl_inspiral_table = None - self.event_ids.clear() - # last_boundary must be reset to -infinity so that it looks - # like a fresh copy of the stream thinca instance - self.last_boundary = -segments.infinity() + def __getitem__(self, coinc_event_id): + newxmldoc = ligolw.Document() + ligolw_elem = newxmldoc.appendChild(ligolw.LIGO_LW()) + + # when making these, we can't use .copy() method of Table + # instances because we need to ensure we have a Table + # subclass, not a DBTable subclass + new_process_table = ligolw_elem.appendChild(lsctables.New(lsctables.ProcessTable, self.process_table.columnnames)) + new_process_params_table = ligolw_elem.appendChild(lsctables.New(lsctables.ProcessParamsTable, self.process_params_table.columnnames)) + new_sngl_inspiral_table = ligolw_elem.appendChild(lsctables.New(lsctables.SnglInspiralTable, self.sngl_inspiral_table.columnnames)) + new_coinc_def_table = ligolw_elem.appendChild(lsctables.New(lsctables.CoincDefTable, self.coinc_def_table.columnnames)) + new_coinc_event_table = ligolw_elem.appendChild(lsctables.New(lsctables.CoincTable, self.coinc_event_table.columnnames)) + new_coinc_inspiral_table = ligolw_elem.appendChild(lsctables.New(lsctables.CoincInspiralTable, self.coinc_inspiral_table.columnnames)) + new_coinc_event_map_table = ligolw_elem.appendChild(lsctables.New(lsctables.CoincMapTable, self.coinc_event_map_table.columnnames)) + new_time_slide_table = ligolw_elem.appendChild(lsctables.New(lsctables.TimeSlideTable, self.time_slide_table.columnnames)) + + new_coinc_def_table.append(self.coinc_def) + coincevent = self.coinc_event_index[coinc_event_id] + new_time_slide_table.extend(self.time_slide_index[coincevent.time_slide_id]) + + new_sngl_inspiral_table.extend(self.self.sngl_inspiral_index[coinc_event_id]) + new_coinc_event_table.append(coincevent) + new_coinc_event_map_table.extend(self.coinc_event_maps_index[coinc_event_id]) + new_coinc_inspiral_table.append(self.coinc_inspiral_index[coinc_event_id]) + + for process_id in set(new_sngl_inspiral_table.getColumnByName("process_id")) | set(new_coinc_event_table.getColumnByName("process_id")) | set(new_time_slide_table.getColumnByName("process_id")): + # process row is required + new_process_table.append(self.process_index[process_id]) + try: + new_process_params_table.extend(self.process_params_index[process_id]) + except KeyError: + # process_params rows are optional + pass + + return newxmldoc + + +# +# ============================================================================= +# +# StreamThinca +# +# ============================================================================= +# + + +def lower_bound_in_seglist(seglist, x): + """ + Return the index of the segment immediately at or before x in the + coalesced segment list seglist. Operation is O(log n). + """ + # NOTE: this is an operation that is performed in a number of + # locations in various codes, and is something that I've screwed up + # more than once. maybe this should be put into segments.py itself + i = bisect.bisect_right(seglist, x) + return i - 1 if i else 0 + + +class StreamThinca(object): + def __init__(self, xmldoc, process_id, delta_t, min_instruments = 2, min_log_L = None, sngls_snr_threshold = None): + self.ln_lr_from_triggers = None + self.delta_t = delta_t + self.min_instruments = min_instruments + self.min_log_L = min_log_L + self.sngls_snr_threshold = sngls_snr_threshold + self.set_xmldoc(xmldoc, process_id) + + + def set_xmldoc(self, xmldoc, process_id): + self.coinc_tables = thinca.InspiralCoincTables(xmldoc, thinca.InspiralCoincDef) + self.sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc) + self.last_coincs = last_coincs(xmldoc) + self.process_id = process_id + self.time_slide_graph = snglcoinc.TimeSlideGraph( + thinca.coincgen_doubles, + lsctables.TimeSlideTable.get_table(xmldoc).as_dict(), + self.delta_t, + min_instruments = self.min_instruments + ) + + + def push(self, instrument, events, t_complete): + """ + Push new triggers into the coinc engine. Returns True if + the coinc engine's internal state has changed in a way that + might enable new candidates to be constructed, False if + not. + """ + return self.time_slide_graph.push(instrument, events, t_complete) + + + def pull(self, rankingstat, fapfar = None, zerolag_rankingstatpdf = None, coinc_sieve = None, flush = False): + # NOTE: rankingstat is not used to compute the ranking + # statistic, it supplies the detector livetime segment + # lists to determine which triggers are eligible for + # inclusion in the background model and is the destination + # for triggers identified for inclusion in the background + # model. self.ln_lr_from_triggers is the ranking statistic + # function (if set). + + # extract times when instruments were producing SNR. used + # to define "on instruments" for coinc tables, as a safety + # check for impossible triggers, and to identify triggers + # suitable for use in defining the background PDFs. will + # only need segment information for the times for which the + # queues will yield triggers, so use a bisection search to + # clip the lists to reduce subsequent operation count. + + age = float(self.time_slide_graph.age) + snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, age):].segmentlist()) for instrument, ratebinlist in rankingstat.denominator.triggerrates.items()) + + # times when SNR was available. used only for code + # correctness checks + + one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1) + # FIXME: this is needed to work around rounding problems + # in safety checks below, trying to compare GPS trigger + # times to float segment boundaries (the boundaries don't + # have enough precision to know if triggers near the edge + # are in or out). it would be better not to have to screw + # around like this. + one_or_more_instruments.protract(1e-3) # 1 ms + + # times when at least 2 instruments were generating SNR. + # used to sieve triggers for inclusion in the denominator. + + two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2) + # FIXME: see comment above. + two_or_more_instruments.protract(1e-3) # 1 ms + + # + # iterate over coincidences + # + + gps_time_now = float(lal.UTCToGPS(time.gmtime())) + newly_reported = [] + flushed = [] + flushed_unused = [] + self.last_coincs.clear() + for node, events in self.time_slide_graph.pull(self.delta_t, newly_reported = newly_reported, flushed = flushed, flushed_unused = flushed_unused, coinc_sieve = coinc_sieve, flush = flush): + # safety check the end times + + assert all(event.end in one_or_more_instruments for event in events), "encountered trigger at time when there was no SNR, ranking statisic will fail" + + # construct row objects for coinc tables. latency + # goes in minimum_duration column. NOTE: latency + # is nonsense unless running live. + # FIXME: add a proper column for latency + + coinc, coincmaps, coinc_inspiral = self.coinc_tables.coinc_rows(self.process_id, node.time_slide_id, events, seglists = snr_segments) + coinc_inspiral.minimum_duration = gps_time_now - float(coinc_inspiral.end) + if self.ln_lr_from_triggers is not None: + coinc.likelihood = self.ln_lr_from_triggers(events, node.offset_vector) + if fapfar is not None: + # FIXME: add proper columns to + # store these values in + coinc_inspiral.combined_far = fapfar.far_from_rank(coinc.likelihood) + coinc_inspiral.false_alarm_rate = fapfar.fap_from_rank(coinc.likelihood) + + # cluster candidates + # FIXME: implement clustering + + # ---> + #if self.stream_thinca.last_coincs: + # self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)]) + # <--- + + # some tasks for zero-lag candidates + + if node.is_zero_lag: + # single-detector candidates collected from + # zero-lag time slides during times when + # two or more instruments were producing + # SNR are added to the noise model + + if len(events) == 1 and events[0].end in two_or_more_instruments: + rankingstat.denominator.increment(events[0]) + + # populate ranking statistic's zero-lag + # PDFs with triggers from all zero-lag + # candidates + + for event in events: + rankingstat.zerolag.increment(event) + + # Add events to the observed likelihood + # histogram post "clustering" + + if zerolag_rankingstatpdf is not None and coinc.likelihood is not None: + zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc.likelihood,] += 1 + + + # if min_log_L is None, this test always passes, + # regardless of the value of .likelihood, be it + # None, some number, -inf or even nan. + if coinc.likelihood >= self.min_log_L: + # finally, append coinc to tables + self.coinc_tables.append_coinc(coinc, coincmaps, coinc_inspiral) + self.last_coincs.add(events, coinc, coincmaps, coinc_inspiral) + + # add singles that were not used for any candidates to the + # noise model + + for event in flushed_unused: + if event.end in two_or_more_instruments: + rankingstat.denominator.increment(event) + + # add any triggers that have been used in coincidences for + # the first time to the sngl_inspiral table + # FIXME: because this information comes from the + # coincidence code, which is not aware of the min_log_L cut + # or the clustering, we record a lot of singles that aren't + # really used for any (retained) coincs. + + self.sngl_inspiral_table.extend(newly_reported) + + # save all sngls above the requested sngls SNR threshold. + # all sngls that participated in coincs are already in the + # document, so only need to check for ones being flushed + # and that were never used. + + if self.sngls_snr_threshold is not None: + self.sngl_inspiral_table.extend(event for event in flushed_unused if event.snr >= self.sngls_snr_threshold) - # return non-coincident sngls - return noncoinc_sngls + # return the triggers that have been flushed + return flushed