Skip to content
Snippets Groups Projects
Commit b30e8e6a authored by Kipp Cannon's avatar Kipp Cannon
Browse files

gstlal_inspiral: port to native streaming coincidence

- rewrite streamthinca as a direct interface to the new streaming
coincidence engine in snglcoinc.py instead of being a wrapper around the
offline coincidence engine in thinca.py
parent 95cfb66d
No related branches found
No related tags found
No related merge requests found
......@@ -283,7 +283,6 @@ def parse_command_line():
group.add_option("--job-tag", metavar = "tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc.; may not contain \".\" nor \"-\".")
group.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data, performs downsampling, and stores to local filespace. ")
group.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
group.add_option("--thinca-interval", metavar = "seconds", type = "float", default = 30.0, help = "Set the thinca interval (default = 30 s).")
group.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
group.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
group.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
......@@ -823,7 +822,6 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
rankingstatpdf_url = options.ranking_stat_pdf,
zerolag_rankingstatpdf_url = zerolag_rankingstat_pdf,
likelihood_snapshot_interval = options.likelihood_snapshot_interval,
thinca_interval = options.thinca_interval,
min_log_L = options.min_log_L,
sngls_snr_threshold = options.singles_threshold,
tag = options.job_tag,
......
......@@ -136,17 +136,17 @@ class RankingStat(snglcoinc.LnLikelihoodRatioMixin):
self.denominator = inspiral_lr.LnNoiseDensity(template_ids = template_ids, instruments = instruments, delta_t = delta_t, min_instruments = min_instruments)
self.zerolag = inspiral_lr.LnLRDensity(template_ids = template_ids, instruments = instruments, delta_t = delta_t, min_instruments = min_instruments)
# def __call__(self, **kwargs):
# # FIXME FIXME FIXME This fast path should be reenabled when we
# # know it scales well. It is currently implemented in the
# # streamthinca ntuple compare func
# # fast-path: network SNR cut
# #if sum(snr**2. for snr in kwargs["snrs"].values()) < self.network_snrsq_threshold:
# # return NegInf
# # full ln L ranking stat. we define the ranking statistic
# # to be the largest ln L from all allowed subsets of
# # triggers
# return max(super(RankingStat, self).__call__(**kwargs) for kwargs in kwarggen(min_instruments = self.min_instruments, **kwargs))
def __call__(self, **kwargs):
# fast-path: network SNR cut
if sum(snr**2. for snr in kwargs["snrs"].values()) < self.network_snrsq_threshold:
return NegInf
# full ln L ranking stat. we define the ranking statistic
# to be the largest ln L from all allowed subsets of
# triggers
# FIXME: temporarily disabled due to performance concerns.
# just chain to parent class
return super(RankingStat, self).__call__(**kwargs)
#return max(super(RankingStat, self).__call__(**kwargs) for kwargs in kwarggen(min_instruments = self.min_instruments, **kwargs))
@property
def template_ids(self):
......
......@@ -46,7 +46,6 @@
#
import bisect
from collections import deque
try:
from fpconst import NaN
......@@ -120,18 +119,6 @@ def subdir_from_T050017_filename(fname):
return path
def lower_bound_in_seglist(seglist, x):
"""
Return the index of the segment immediately at or before x in the
coalesced segment list seglist. Operation is O(log n).
"""
# NOTE: this is an operation that is performed in a number of
# locations in various codes, and is something that I've screwed up
# more than once. maybe this should be put into segments.py itself
i = bisect.bisect_right(seglist, x)
return i - 1 if i else 0
#
# =============================================================================
#
......@@ -587,7 +574,7 @@ class Handler(simplehandler.Handler):
dumps of segment information, trigger files and background
distribution statistics.
"""
def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, thinca_interval = 50.0, min_log_L = None, sngls_snr_threshold = None, tag = "", kafka_server = "10.14.0.112:9092", verbose = False):
def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, min_log_L = None, sngls_snr_threshold = None, tag = "", kafka_server = "10.14.0.112:9092", verbose = False):
"""!
@param mainloop The main application's event loop
@param pipeline The gstreamer pipeline that is being
......@@ -638,20 +625,14 @@ class Handler(simplehandler.Handler):
bottle.route("/sngls_snr_threshold.txt", method = "POST")(self.web_set_sngls_snr_threshold)
bottle.route("/zerolag_rankingstatpdf.xml")(self.web_get_zerolag_rankingstatpdf)
#
# all triggers up to this index have had their SNR time
# series deleted to save memory
#
self.snr_time_series_cleanup_index = 0
#
# attach a StreamThinca instance to ourselves
#
self.stream_thinca = streamthinca.StreamThinca(
coincidence_threshold = rankingstat.delta_t,
thinca_interval = thinca_interval, # seconds
coincs_document.xmldoc,
coincs_document.process_id,
delta_t = rankingstat.delta_t,
min_instruments = rankingstat.min_instruments,
min_log_L = min_log_L,
sngls_snr_threshold = sngls_snr_threshold
......@@ -696,24 +677,24 @@ class Handler(simplehandler.Handler):
if self.ranking_stat_input_url is not None:
if self.rankingstat.is_healthy(self.verbose):
self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish()
self.stream_thinca.ln_lr_from_triggers = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish().ln_lr_from_triggers
if self.verbose:
print >>sys.stderr, "ranking statistic assignment ENABLED"
else:
del self.stream_thinca.rankingstat
self.stream_thinca.ln_lr_from_triggers = None
if self.verbose:
print >>sys.stderr, "ranking statistic assignment DISABLED"
elif min_log_L is not None:
self.stream_thinca.rankingstat = far.DatalessRankingStat(
self.stream_thinca.ln_lr_from_triggers = far.DatalessRankingStat(
template_ids = rankingstat.template_ids,
instruments = rankingstat.instruments,
min_instruments = rankingstat.min_instruments,
delta_t = rankingstat.delta_t
).finish()
).finish().ln_lr_from_triggers
if self.verbose:
print >>sys.stderr, "ranking statistic assignment ENABLED"
else:
del self.stream_thinca.rankingstat
self.stream_thinca.ln_lr_from_triggers = None
if self.verbose:
print >>sys.stderr, "ranking statistic assignment DISABLED"
......@@ -779,6 +760,17 @@ class Handler(simplehandler.Handler):
name = "%s_ht_gate" % instrument
elem = pipeline.get_by_name(name)
if elem is None:
# FIXME: if there is no data for an
# instrument for which we have ranking
# statistic data then the horizon distance
# record needs to indicate that it was off
# for the entire segment. should probably
# 0 the horizon distance history at start
# and stop of each stream, but there is a
# statement in the EOS handler that we
# don't do that, and I can remember why
# that is.
continue
raise ValueError("cannot find \"%s\" element for %s" % (name, instrument))
if verbose:
print >>sys.stderr, "\tfound %s for %s" % (name, instrument)
......@@ -1002,11 +994,11 @@ class Handler(simplehandler.Handler):
# update streamthinca's ranking statistic
# data
if self.rankingstat.is_healthy(self.verbose):
self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish()
self.stream_thinca.ln_lr_from_triggers = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish().ln_lr_from_triggers
if self.verbose:
print >>sys.stderr, "ranking statistic assignment ENABLED"
else:
del self.stream_thinca.rankingstat
self.stream_thinca.ln_lr_from_triggers = None
if self.verbose:
print >>sys.stderr, "ranking statistic assignment DISABLED"
......@@ -1035,102 +1027,21 @@ class Handler(simplehandler.Handler):
snr_min = self.rankingstat.snr_min
self.rankingstat.denominator.triggerrates[instrument].add_ratebin(map(float, buf_seg), len([event for event in events if event.snr >= snr_min]))
# extract times when instruments were producing
# SNR. used to define "on instruments" for coinc
# tables among other things. will only need
# segment information for the times for which we
# have triggers, so use stream_thinca's discard
# boundary and a bisection search to clip the lists
# to reduce subsequent operation count.
discard_boundary = float(self.stream_thinca.discard_boundary)
snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items())
# times when SNR was available. used only for code
# correctness checks
one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1)
# FIXME: this is needed to work around rounding
# problems in safety checks below, trying to
# compare GPS trigger times to float segment
# boundaries (the boundaries don't have enough
# precision to know if triggers near the edge are
# in or out). it would be better not to have to
# screw around like this.
one_or_more_instruments.protract(1e-3) # 1 ms
# times when at least 2 instruments were generating
# SNR. used to sieve triggers for inclusion in the
# denominator.
two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2)
# FIXME: see comment above.
two_or_more_instruments.protract(1e-3) # 1 ms
# run stream thinca. update the ranking
# statistic's denominator's histograms from sngls
# that weren't used in zero-lag multi-instrument
# coincs. NOTE: we rely on the arguments to
# .chain() being evaluated in left-to-right order
# so that .add_events() is evaluated before
# .last_coincs because the former initializes the
# latter. we skip singles collected during times
# when only one instrument was on. NOTE: the
# definition of "on" is blurry since we can recover
# triggers with end times outside of the available
# strain data, but the purpose of this test is
# simply to prevent signals occuring during
# single-detector times from contaminating our
# noise model, so it's not necessary for this test
# to be super precisely defined.
for event in itertools.chain(self.stream_thinca.add_events(self.coincs_document.xmldoc, self.coincs_document.process_id, events, buf_timestamp, snr_segments, fapfar = self.fapfar), self.stream_thinca.last_coincs.single_sngl_inspirals() if self.stream_thinca.last_coincs else ()):
if self.ranking_stat_output_url is None:
continue
assert event.end in one_or_more_instruments, "trigger at time (%s) when there was no SNR (%s)" % (str(event.end), str(one_or_more_instruments))
if event.end in two_or_more_instruments:
self.rankingstat.denominator.increment(event)
self.coincs_document.commit()
# update zero-lag bin counts in rankingstat.
if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None:
for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids:
for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id):
self.rankingstat.zerolag.increment(event)
# Cluster last coincs before recording number of
# zero lag events or sending alerts to gracedb
# FIXME Do proper clustering that saves states
# between thinca intervals and uses an independent
# clustering window. This can also go wrong if
# there are multiple events with an identical
# likelihood. It will just choose the event with
# the highest event id
if self.stream_thinca.last_coincs:
self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)])
# Add events to the observed likelihood histogram
# post "clustering"
# FIXME proper clustering is really needed (see
# above)
if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None:
for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids:
self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1
# do GraceDB alerts and update eye candy
self.__do_gracedb_alerts()
self.eye_candy.update(events, self.stream_thinca.last_coincs)
# after doing alerts, no longer need per-trigger
# SNR data for the triggers that are too old to be
# used to form candidates. to avoid searching the
# entire list of triggers each time, we stop when
# we encounter the first trigger whose SNR series
# might still be needed, save its index, and start
# the search from there next time
discard_boundary = self.stream_thinca.discard_boundary
for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index):
if event.end >= discard_boundary:
break
del event.snr_time_series
# run stream thinca.
if self.stream_thinca.push(instrument, events, buf_timestamp):
flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = lambda events: sum(event.snr**2. for event in events) >= self.rankingstat.network_snrsq_threshold)
self.coincs_document.commit()
# do GraceDB alerts and update eye candy
self.__do_gracedb_alerts(self.stream_thinca.last_coincs)
self.eye_candy.update(events, self.stream_thinca.last_coincs)
# after doing alerts, no longer need
# per-trigger SNR data for the triggers
# that are too old to be used to form
# candidates.
for event in flushed_sngls:
del event.snr_time_series
def _record_horizon_distance(self, instrument, timestamp, horizon_distance):
......@@ -1310,78 +1221,24 @@ class Handler(simplehandler.Handler):
def __flush(self):
# run StreamThinca's .flush(). returns the last remaining
# non-coincident sngls. add them to the distribution. as
# above in appsink_new_buffer() we skip singles collected
# during times when only one instrument was one.
# times when at least 2 instruments were generating SNR.
# used to sieve triggers for inclusion in the denominator.
discard_boundary = float(self.stream_thinca.discard_boundary)
snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items())
one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1)
# FIXME: see comment in appsink_new_buffer()
one_or_more_instruments.protract(1e-3) # 1 ms
two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2)
# FIXME: see comment in appsink_new_buffer()
two_or_more_instruments.protract(1e-3) # 1 ms
for event in self.stream_thinca.flush(self.coincs_document.xmldoc, self.coincs_document.process_id, snr_segments, fapfar = self.fapfar):
if self.ranking_stat_output_url is None:
continue
assert event.end in one_or_more_instruments, "trigger at time (%s) with no SNR (%s)" % (str(event.end), str(one_or_more_instruments))
if event.end in two_or_more_instruments:
self.rankingstat.denominator.increment(event)
self.coincs_document.commit()
# run StreamThinca in flush mode. forms candidates from
# whatever triggers remain in the queues, and processes
# them
# update zero-lag bin counts in rankingstat.
if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None:
for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids:
for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id):
self.rankingstat.zerolag.increment(event)
# Cluster last coincs before recording number of zero
# lag events or sending alerts to gracedb
# FIXME Do proper clustering that saves states between
# thinca intervals and uses an independent clustering
# window. This can also go wrong if there are multiple
# events with an identical likelihood. It will just
# choose the event with the highest event id
if self.stream_thinca.last_coincs:
self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)])
# Add events to the observed likelihood histogram post
# "clustering"
# FIXME proper clustering is really needed (see above)
if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None:
for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids:
self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1
flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, flush = True)
self.coincs_document.commit()
# do GraceDB alerts
self.__do_gracedb_alerts()
self.__do_gracedb_alerts(self.stream_thinca.last_coincs)
# after doing alerts, no longer need per-trigger SNR data
# for the triggers that are too old to be used to form
# candidates. to avoid searching the entire list of
# triggers each time, we stop when we encounter the first
# trigger whose SNR series might still be needed, save its
# index, and start the search from there next time
discard_boundary = self.stream_thinca.discard_boundary
for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index):
if event.end >= discard_boundary:
break
# candidates.
for event in flushed_sngls:
del event.snr_time_series
# garbage collect last_coincs
# FIXME: this should not be needed. something is wrong.
# if this is needed, then why don't we have to garbage
# collect everything ourselves?
self.stream_thinca.last_coincs = {}
def __do_gracedb_alerts(self):
def __do_gracedb_alerts(self, last_coincs):
# check for no-op
if self.rankingstatpdf is None or not self.rankingstatpdf.is_healthy():
return
......@@ -1392,7 +1249,7 @@ class Handler(simplehandler.Handler):
assert self.fapfar is not None
# do alerts
self.gracedbwrapper.do_alerts(self.stream_thinca.last_coincs, self.psds, self.__get_rankingstat_xmldoc)
self.gracedbwrapper.do_alerts(last_coincs, self.psds, self.__get_rankingstat_xmldoc)
def web_get_sngls_snr_threshold(self):
......@@ -1454,4 +1311,10 @@ class Handler(simplehandler.Handler):
coincs_document = self.coincs_document.get_another()
self.__write_output_url(url = fname, verbose = verbose)
self.coincs_document = coincs_document
self.snr_time_series_cleanup_index = 0
# NOTE: this operation requires stream_thinca to
# have been flushed by calling .pull() with flush =
# True. the .__write_output_url() code path does
# that, but there is no check here to ensure that
# remains true so be careful if you are editing
# these methods.
self.stream_thinca.set_xmldoc(self.coincs_document.xmldoc, self.coincs_document.process_id)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment