Skip to content
Snippets Groups Projects
Commit 3d2f7ff5 authored by Kipp Cannon's avatar Kipp Cannon
Browse files

lloidhandler: word wrap

parent 1a7ada66
No related branches found
No related tags found
No related merge requests found
......@@ -325,15 +325,21 @@ class SegmentsTracker(object):
"""!
A handler that intercepts gate state transitions.
@param elem A reference to the lal_gate element or None (only used for verbosity)
@param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds)
@param segtype the class of segments this gate is defining, e.g., "datasegments", etc..
@param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc..
@param new_state the state transition, must be either "on" or "off"
@param elem A reference to the lal_gate element or None
(only used for verbosity)
@param timestamp A gstreamer time stamp (integer
nanoseconds) that marks the state transition
@param segtype the class of segments this gate is defining,
e.g., "datasegments", etc..
@param instrument the instrument this state transtion is to
be attributed to, e.g., "H1", etc..
@param new_state the state transition, must be either "on"
or "off"
Must be called with the lock held.
"""
timestamp = LIGOTimeGPS(0, timestamp) # timestamp is in nanoseconds
# convert integer nanoseconds to LIGOTimeGPS
timestamp = LIGOTimeGPS(0, timestamp)
if self.verbose:
print >>sys.stderr, "%s: %s '%s' state transition: %s @ %s" % ((elem.get_name() if elem is not None else "<internal>"), instrument, segtype, new_state, str(timestamp))
......@@ -410,10 +416,12 @@ class SegmentsTracker(object):
def flush_segments_to_disk(self, tag, timestamp):
"""!
Flush segments to disk, e.g., when checkpointing or shutting
down an online pipeline.
Flush segments to disk, e.g., when checkpointing or
shutting down an online pipeline.
@param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk
@param timestamp the LIGOTimeGPS timestamp of the current
buffer in order to close off open segment intervals before
writing to disk
"""
with self.lock:
# make a copy of the current segmentlistdicts
......@@ -430,7 +438,8 @@ class SegmentsTracker(object):
for seglistdict in seglistdicts.values():
seglistdict -= seglistdict.fromkeys(seglistdict, segments.segmentlist([segments.segment(segments.NegInfinity, timestamp)]))
# write the current (clipped) segmentlistdicts to disk
# write the current (clipped) segmentlistdicts to
# disk
fname = self.__T050017_filename("%s_SEGMENTS" % tag, "xml.gz")
fname = os.path.join(subdir_from_T050017_filename(fname), fname)
ligolw_utils.write_filename(self.gen_segments_xmldoc(), fname, gz = fname.endswith('.gz'), verbose = self.verbose, trap_signals = None)
......@@ -519,9 +528,11 @@ class Handler(simplehandler.Handler):
def __init__(self, mainloop, pipeline, coincs_document, rankingstat, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, thinca_interval = 50.0, min_log_L = None, sngls_snr_threshold = None, tag = "", verbose = False):
"""!
@param mainloop The main application's event loop
@param pipeline The gstreamer pipeline that is being controlled by this handler
@param pipeline The gstreamer pipeline that is being
controlled by this handler
@param dataclass A Data class instance
@param tag The tag to use for naming file snapshots, e.g. the description will be "%s_LLOID" % tag
@param tag The tag to use for naming file snapshots, e.g.
the description will be "%s_LLOID" % tag
@param verbose Be verbose
"""
super(Handler, self).__init__(mainloop, pipeline)
......@@ -852,13 +863,15 @@ class Handler(simplehandler.Handler):
# sanity check that gap buffers are empty
assert not (buf_is_gap and events)
# safety check end times. OK for end times to be
# past end of buffer, but we cannot allow triggr
# safety check end times. we cannot allow triggr
# times to go backwards. they cannot precede the
# buffer's start because, below,
# streamthinca.add_events() will be told the
# trigger list is complete upto this buffer's time
# stamp.
# stamp. this logic also requires this method to
# be fed buffers in time order: we must never
# receive a buffer whose timestamp precedes the
# timestamp of a buffer we have already received
assert all(event.end >= buf_timestamp for event in events)
# we have extended the buf_seg above to enclose the
# triggers, make sure that worked
......@@ -898,9 +911,10 @@ class Handler(simplehandler.Handler):
# should be responsible for it somehow, no?
# NOTE: self.snapshot_output_url() writes
# the current rankingstat object to the
# location identified by .ranking_stat_output_url,
# so if that is either not set or at least
# set to a different name than
# location identified by
# .ranking_stat_output_url, so if that is
# either not set or at least set to a
# different name than
# .ranking_stat_input_url the file that has
# just been loaded above will not be
# overwritten.
......@@ -984,22 +998,22 @@ class Handler(simplehandler.Handler):
# FIXME: see comment above.
two_or_more_instruments.protract(1e-3) # 1 ms
# run stream thinca. update the parameter
# distribution data from sngls that weren't used in
# zero-lag multi-instrument coincs. NOTE: we rely
# on the arguments to .chain() being evaluated in
# left-to-right order so that .add_events() is
# evaluated before .last_coincs because the former
# initializes the latter. we skip singles
# collected during times when only one instrument
# was on. NOTE: the definition of "on" is blurry
# since we can recover triggers with end times
# outside of the available strain data, but the
# purpose of this test is simply to prevent signals
# occuring during single-detector times from
# contaminating our noise model, so it's not
# necessary for this test to be super precisely
# defined.
# run stream thinca. update the ranking
# statistic's denominator's histograms from sngls
# that weren't used in zero-lag multi-instrument
# coincs. NOTE: we rely on the arguments to
# .chain() being evaluated in left-to-right order
# so that .add_events() is evaluated before
# .last_coincs because the former initializes the
# latter. we skip singles collected during times
# when only one instrument was on. NOTE: the
# definition of "on" is blurry since we can recover
# triggers with end times outside of the available
# strain data, but the purpose of this test is
# simply to prevent signals occuring during
# single-detector times from contaminating our
# noise model, so it's not necessary for this test
# to be super precisely defined.
for event in itertools.chain(self.stream_thinca.add_events(self.coincs_document.xmldoc, self.coincs_document.process_id, events, buf_timestamp, snr_segments, fapfar = self.fapfar), self.stream_thinca.last_coincs.single_sngl_inspirals() if self.stream_thinca.last_coincs else ()):
if self.ranking_stat_output_url is None:
continue
......@@ -1015,13 +1029,14 @@ class Handler(simplehandler.Handler):
for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id):
self.rankingstat.zerolag.increment(event)
# Cluster last coincs before recording number of zero
# lag events or sending alerts to gracedb
# FIXME Do proper clustering that saves states between
# thinca intervals and uses an independent clustering
# window. This can also go wrong if there are multiple
# events with an identical likelihood. It will just
# choose the event with the highest event id
# Cluster last coincs before recording number of
# zero lag events or sending alerts to gracedb
# FIXME Do proper clustering that saves states
# between thinca intervals and uses an independent
# clustering window. This can also go wrong if
# there are multiple events with an identical
# likelihood. It will just choose the event with
# the highest event id
if self.stream_thinca.last_coincs:
self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)])
......@@ -1076,12 +1091,17 @@ class Handler(simplehandler.Handler):
def horizgatehandler(self, elem, timestamp, (instrument, new_state)):
"""!
A handler that intercepts h(t) gate state transitions to 0 horizon distances.
@param elem A reference to the lal_gate element or None (only used for verbosity)
@param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds)
@param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc..
@param new_state the state transition, must be either True or False
A handler that intercepts h(t) gate state transitions to 0
horizon distances.
@param elem A reference to the lal_gate element or None
(only used for verbosity)
@param timestamp A gstreamer time stamp that marks the
state transition (in nanoseconds)
@param instrument the instrument this state transtion is to
be attributed to, e.g., "H1", etc..
@param new_state the state transition, must be either True
or False
"""
# essentially we want to set the horizon distance record to
# 0 at both on-to-off and off-to-on transitions so that the
......@@ -1143,7 +1163,9 @@ class Handler(simplehandler.Handler):
"""!
Checkpoint, e.g., flush segments and triggers to disk.
@param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk
@param timestamp the LIGOTimeGPS timestamp of the current
buffer in order to close off open segment intervals before
writing to disk
"""
# FIXME: the timestamp is used to close off open segments
# and so should *not* be the timestamp of the current
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment