Skip to content
Snippets Groups Projects
Commit 9c068d6e authored by Kipp Cannon's avatar Kipp Cannon
Browse files

lloidparts: big segment tracking clean-up

- .current_segment_start is removed, instead semi-closed segments are stuck onto the end of each segment list when an "on" transition occurs, and they are clipped when an "off" transition occurs
- this makes it much easier to determine which instruments are beleived to be on at a given time
- it makes the task of check-pointing more straight forward as the segments can be just written to disk as-is
- the "cumulative segments" are renamed to "recent segment history" because they are not cumulative, they are only a look-back into the recent past
- but the bottle route is left unchanged for compatibility with external tools
parent 31d62209
No related branches found
No related tags found
No related merge requests found
......@@ -204,7 +204,6 @@ class Handler(simplehandler.Handler):
self.tag = tag
self.zero_lag_ranking_stats_filename = zero_lag_ranking_stats_filename
self.segment_history_duration = segment_history_duration
self.verbose = verbose
# setup segment list collection from gates
......@@ -242,11 +241,9 @@ class Handler(simplehandler.Handler):
# FIXME: don't do this, get rid of the Data class
dataclass.seglistdicts = self.seglistdicts
# create a deep copy to keep track of cumulative segments
self.cumulative_seglistdicts = self.seglistdicts.copy()
# state of segments being collected
self.current_segment_start = {}
# create a copy to keep track of recent segment history
self.recent_segment_histories = self.seglistdicts.copy()
self.segment_history_duration = segment_history_duration
# iterate over segment types and instruments, look for the
# gate element that should provide those segments, and
......@@ -281,7 +278,7 @@ class Handler(simplehandler.Handler):
# segment lists
bottle.route("/segments.xml")(self.web_get_segments_xml)
bottle.route("/cumulative_segments.xml")(self.web_get_cumulative_segments_xml)
bottle.route("/cumulative_segments.xml")(self.web_get_recent_segment_history_xml)
def do_on_message(self, bus, message):
"""!
......@@ -324,12 +321,22 @@ class Handler(simplehandler.Handler):
elif message.type == Gst.MessageType.EOS:
with self.dataclass.lock:
# FIXME: how to choose correct timestamp?
# note that EOS messages' timestamps are
# set to CLOCK_TIME_NONE so they can't be
# used for this.
try:
timestamp = self.seglistdicts["triggersegments"].extent_all()[1].ns()
except ValueError:
# no segments
return False
self.close_segments(timestamp)
# terminate all segments except
# triggersements.
off = segments.segmentlist([segments.segment(timestamp, segments.PosInfinity)])
for name, seglistdict in self.seglistdicts.items():
if name == "triggersegments":
continue
for seglist in seglistdict.values():
seglist -= off
return False
return False
......@@ -353,35 +360,6 @@ class Handler(simplehandler.Handler):
with self.dataclass.lock:
self._record_horizon_distance(instrument, timestamp, horizon_distance)
def _close_segments(self, timestamp):
"""
@timestamp must be a GPS time that is guaranteed to precede
any possible future state transitions in all segment lists
being tracked.
"""
# close out existing segments. the code in the loop
# modifies the iteration target, so iterate over a copy
for (segtype, instrument), start_time in list(self.current_segment_start.items()):
if timestamp < start_time.ns():
continue
# By construction these gates should be in the on
# state. We fake a state transition to off in
# order to flush the segments
self._gatehandler(None, timestamp, (segtype, instrument, "off"))
# But we have to remember to put it back
self._gatehandler(None, timestamp, (segtype, instrument, "on"))
def close_segments(self, timestamp):
"""!
Record stop times for all open segments and start new ones.
@param timestamp the time in nanoseconds at which to mark
the boundary. If this preceeds and open segment's start
time, that segment is not closed.
"""
with self.dataclass.lock:
self._close_segments(timestamp)
def checkpoint(self, timestamp):
"""!
Checkpoint, e.g., flush segments and triggers to disk.
......@@ -408,38 +386,52 @@ class Handler(simplehandler.Handler):
@param timestamp the gstreamer timestamp in nanoseconds of the current buffer in order to close off open segment intervals before writing to disk
"""
timestamp = LIGOTimeGPS(0, timestamp)
with self.dataclass.lock:
# make a copy of the current segmentlistdicts
seglistdicts = dict((key, value.copy()) for key, value in self.seglistdicts)
# FIXME: in the next step we don't clip the
# triggersegments. the online pipeline needs these
# to accumulate forever, but that might not be what
# it should be doing, nor should these necessarily
# be the segments it uses for livetime. figure
# this out
# keep everything before timestamp in the current
# segmentlistdicts.
dontclip = set(("triggersegments",))
for seglistdict in self.seglistdicts.values():
seglistdict -= seglistdict.fromkeys(set(seglistdict) - dontclip, segments.segmentlist([segments.segment(timestamp, segments.PosInfinity)]))
# keep everything after timestamp in the copy
for seglistdict in seglistdicts.values():
seglistdict -= seglistdict.fromkeys(set(seglistdict) - dontclip, segments.segmentlist([segments.segment(segments.NegInfinity, timestamp)]))
# construct a filename for the current (clipped)
# segmentlistdicts
try:
# close out existing and update cumulative segments.
self._close_segments(timestamp)
self.update_cumulative_segments()
ext = segments.segmentlist(seglistdict.extent_all() for seglistdict in self.seglistdicts.values()).extent()
instruments = set(instrument for seglistdict in self.seglistdicts.values() for instrument in seglistdict)
#FIXME integrate with the Data class snapshotting directories
path = str(int(math.floor(ext[0])))[:5]
try:
os.mkdir(path)
except OSError:
pass
fname = "%s/%s-%s_SEGMENTS-%d-%d.xml.gz" % (path, "".join(sorted(instruments)), self.tag, int(math.floor(ext[0])), int(math.ceil(ext[1])) - int(math.floor(ext[0])))
ligolw_utils.write_filename(self.gen_segments_xmldoc(), fname, gz = fname.endswith('.gz'), verbose = self.verbose, trap_signals = None)
# clear the segment lists in place
for segtype, seglistdict in self.seglistdicts.items():
# FIXME: we don't wipe the
# triggersegments for now. the
# online pipeline needs these to
# accumulate forever, but that
# might not be what it should be
# doing, nor should these
# necessarily be the segments it
# uses for livetime. figure this out
if segtype == "triggersegments":
continue
for seglist in seglistdict.values():
del seglist[:]
start, end = segments.segmentlist(seglistdict.extent_all() for name, seglistdict in self.seglistdicts.items() if name not in dontclip).extent()
except ValueError:
print >>sys.stderr, "Warning: couldn't build segment list on checkpoint, probably there aren't any segments"
return
start = int(math.floor(start))
duration = int(math.ceil(end)) - start
# FIXME integrate with the Data class snapshotting
# directories
path = str(start)[:5]
try:
os.mkdir(path)
except OSError:
pass
fname = os.path.join(path, "%s-%s_SEGMENTS-%d-%d.xml.gz" % ("".join(sorted(instruments)), self.tag, start, duration))
# write the current (clipped) segmentlistdicts to
# disk
ligolw_utils.write_filename(self.gen_segments_xmldoc(), fname, gz = fname.endswith('.gz'), verbose = self.verbose, trap_signals = None)
# continue with the (clipped) copy
self.seglistdicts = seglistdicts
def _gatehandler(self, elem, timestamp, (segtype, instrument, new_state)):
# FIXME: this method could conceivably be patched to know
......@@ -450,15 +442,13 @@ class Handler(simplehandler.Handler):
# artificially claiming segments to be on beyond the time
# when they should stop.
timestamp = LIGOTimeGPS(0, timestamp) # timestamp is in nanoseconds
state_key = (segtype, instrument)
if self.verbose and elem is not None:
print >>sys.stderr, "%s: %s '%s' state transition: %s @ %s" % (elem.get_name(), instrument, segtype, new_state, str(timestamp))
if new_state == "off":
# record end of segment
if state_key in self.current_segment_start:
self.seglistdicts[segtype][instrument] |= segments.segmentlist((segments.segment(self.current_segment_start.pop(state_key), timestamp),))
self.seglistdicts[segtype][instrument] -= segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),))
# set the horizon distance history to 0 at
# on-to-off transitions of whitened h(t)
if segtype == "whitehtsegments":
......@@ -467,9 +457,8 @@ class Handler(simplehandler.Handler):
else:
self._record_horizon_distance(instrument, float(timestamp), 0.)
elif new_state == "on":
assert state_key not in self.current_segment_start
# record start of new segment
self.current_segment_start[state_key] = timestamp
self.seglistdicts[segtype][instrument] += segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),))
# place a 0 in the horizon distance history at the
# time of an off-to-on transition so that the
# horizon distance queries in the interval of off
......@@ -494,8 +483,14 @@ class Handler(simplehandler.Handler):
# which the horizon distance history could be
# somewhat inaccurate.
if segtype == "whitehtsegments":
if self.seglistdicts[segtype][instrument]:
self._record_horizon_distance(instrument, slice(float(self.seglistdicts[segtype][instrument][-1][-1]), float(timestamp)), 0.)
# this if statement is checking if
# ~self.seglistdicts[segtype][instrument]
# is empty, and if not then it zeros the
# interval spanned by the last segment in
# that list, but what's done below avoids
# the explicit segment arithmetic
if len(self.seglistdicts[segtype][instrument]) > 1:
self._record_horizon_distance(instrument, slice(float(self.seglistdicts[segtype][instrument][-2][-1]), float(timestamp)), 0.)
else:
self._record_horizon_distance(instrument, float(timestamp), 0.)
else:
......@@ -540,54 +535,41 @@ class Handler(simplehandler.Handler):
output.close()
return outstr
def update_cumulative_segments(self):
def update_recent_segment_history(self):
"""!
A method to update the cumulative segment list
A method to update the recent segment histories
"""
current_gps_time = lal.GPSTimeNow()
seglist_to_drop = segments.segmentlist([segments.segment(segments.NegInfinity, current_gps_time - self.segment_history_duration)])
for segtype, seglistdict in self.cumulative_seglistdicts.items():
seglist_to_drop = segments.segmentlist([segments.segment(segments.NegInfinity, current_gps_time - self.segment_history_duration), segments.segment(current_gps_time, segments.PosInfinity)])
for segtype, seglistdict in self.recent_segment_histories.items():
seglistdict.extend(self.seglistdicts[segtype])
seglistdict.coalesce()
for seglist in seglistdict.values():
seglist -= seglist_to_drop
def gen_cumulative_segments_xmldoc(self):
def gen_recent_segment_history_xmldoc(self):
"""!
A method to output the cumulative segment list in a valid
ligolw xml format.
Construct and return a LIGOLW XML tree containing the
recent segment histories.
"""
self.update_recent_segment_history()
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {})
# Toggle segments off and on to make sure segment information
# added to the cumulative segments is current This needs to be
# run with self.dataclass.lock, but this function is only
# called currently by web_get_cumulative_segments_xml, which
# calls with with self.dataclass.lock
try:
# FIXME Timestamp here needs to be thought about more,
# for the same reason mentioned _gatehandler
timestamp = self.seglistdicts["triggersegments"].extent_all()[1].ns()
self._close_segments(timestamp)
except ValueError:
# no segments
print >>sys.stderr, "cannot close segments before updating cumulative segments, segment info may be incomplete"
self.update_cumulative_segments()
with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments:
for segtype, seglistdict in self.cumulative_seglistdicts.items():
for segtype, seglistdict in self.recent_segment_histories.items():
ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot")
ligolw_process.set_process_end_time(process)
return xmldoc
def web_get_cumulative_segments_xml(self):
def web_get_recent_segment_history_xml(self):
"""!
provide a bottle route to get cumulative segment information via a url
provide a bottle route to get recent segment history
information via a url
"""
with self.dataclass.lock:
output = StringIO.StringIO()
ligolw_utils.write_fileobj(self.gen_cumulative_segments_xmldoc(), output)
ligolw_utils.write_fileobj(self.gen_recent_segment_history_xmldoc(), output)
outstr = output.getvalue()
output.close()
return outstr
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment