Skip to content
Snippets Groups Projects
Commit 745d77a6 authored by Hunter Schuler's avatar Hunter Schuler Committed by Patrick Godwin
Browse files

[lloidhandler.py] Implement single stage pipeline latency monitoring

[lloidhandler.py] Make use of latency pipeline latency tracking

[lloidhandler.py] Use latency pipeline timestamp

[lloidhandler.py] Implement unique latency pipeline names

[lloidhandler.py] Fix infinite loop

[lloidhandler.py] Add full name to verbosity

[lloidhandler.py] Fix attaching to latency pipeline element

[lloidhandler.py] Fix data push to kafka

Merge branch 'master' of git.ligo.org:lscsoft/gstlal
parent 05258794
No related branches found
No related tags found
1 merge request!29Latency monitoring refactor
......@@ -31,3 +31,4 @@ configure
*/tests/*.log
*/tests/*.trs
libtool
.vscode
......@@ -132,10 +132,11 @@ def subdir_from_T050017_filename(fname):
class EyeCandy(object):
def __init__(self, instruments, kafka_server, tag, pipeline, segmentstracker):
def __init__(self, instruments, kafka_server, tag, pipeline, segmentstracker, latencytracker):
self.kafka_server = kafka_server
self.tag = tag
self.gate_history = segmentstracker.gate_history
self.pipeline_latency_history = latencytracker.pipeline_history
self.latency_histogram = rate.BinnedArray(rate.NDBins((rate.LinearPlusOverflowBins(5, 205, 22),)))
# NOTE most of this data is collected at 1Hz, thus a 300
# element deque should hold about 5 minutes of history.
......@@ -188,7 +189,9 @@ class EyeCandy(object):
gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")]
seg_routes = ["%s_%s" % (ifo, gate) for ifo in instruments for gate in gates]
topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes))
pipeline_latency_routes = ["%s_whitening_latency" % ifo for ifo in instruments]
topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, pipeline_latency_routes))
self.client = kafka.Client("kafka://{}".format(self.kafka_server))
self.client.subscribe(topics)
......@@ -297,6 +300,14 @@ class EyeCandy(object):
gate_onoff = interp1d(gate_interp_times, gate_interp_onoff, kind='zero')(gate_times)
self.kafka_data["%s_%s" % (instrument, gate)]["time"].extend([t for t in gate_times if t >= self.time_since_last_state])
self.kafka_data["%s_%s" % (instrument, gate)]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state])
# collect any new latencies
for stage in self.pipeline_latency_history.keys():
for instrument in self.pipeline_latency_history[stage].keys():
while (self.pipeline_latency_history[stage][instrument]):
data = self.pipeline_latency_history[stage][instrument].pop()
self.kafka_data["%s_%s_latency" % (instrument, stage)]["time"].append(data[0])
self.kafka_data["%s_%s_latency" % (instrument, stage)]["data"].append(data[1])
# collect strain dropped samples
for instrument, elem in self.strain.items():
......@@ -639,6 +650,76 @@ class SegmentsTracker(object):
output.close()
return outstr
#
# =============================================================================
#
# Pipeline Latency Tracker
#
# =============================================================================
#
class LatencyTracker(object):
def __init__(self, pipeline, instruments, verbose = False):
self.lock = threading.Lock()
self.verbose = verbose
# recent pipeline stage latencies
pipeline_stages = ["whitening"]
self.pipeline_history = {stage: {instrument: deque(maxlen = 20) for instrument in instruments} for stage in pipeline_stages}
# iterate over pipeline stages, look for the
# stage element that should provide the latencies, and
# connect handlers to collect the latencies
if verbose:
print(sys.stderr, "connecting pipeline latency handlers ...", file=sys.stderr)
for stage in self.pipeline_history.keys():
for instrument in self.pipeline_history[stage].keys():
elem = pipeline.get_by_name("%s_%s_latency" % (instrument, stage))
if elem is None:
if verbose:
print("\tcould not find %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr)
continue
if verbose:
print("\tfound %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr)
elem.connect("notify::current-latency", self.latencyhandler, (stage, instrument, "stop"))
if verbose:
print("... done connecting pipeline latency handlers", file=sys.stderr)
def __latencyhandler(self, elem, timestamp, stage_latency_input):
"""!
A handler that intercepts pipeline latency calls.
@param elem A reference to the lal_gate element or None
(only used for verbosity)
@param timestamp A gstreamer time stamp (integer
nanoseconds) that marks the state transition
@param stage_latency_input name, instrument and state of the stage
from the pipeline
Must be called with the lock held.
"""
# parse stage_latency_input
stage, instrument, state = stage_latency_input
# parse properties from element trigger
latency = elem.get_property("current-latency")
name = elem.get_property("name") if elem is not None else "<internal>"
timestamp = elem.get_property("timestamp")
if self.verbose:
print("%s: %s trigger @ %s with latency %s" % (name, state, str(timestamp), str(latency)), file= sys.stderr)
if state == "stop":
# record pipeline stage latency
self.pipeline_history[stage][instrument].append((float(timestamp), float(latency)))
else:
assert False, "impossible state '%s'" % state
def latencyhandler(self, elem, timestamp, stage_latency_input):
with self.lock:
self.__latencyhandler(elem, timestamp, stage_latency_input)
#
# =============================================================================
......@@ -698,11 +779,17 @@ class Handler(simplehandler.Handler):
self.segmentstracker = SegmentsTracker(pipeline, rankingstat.instruments, verbose = verbose)
#
# setup latency collection from pipeline
#
self.latencytracker = LatencyTracker(pipeline, rankingstat.instruments, verbose = verbose)
#
# set up metric collection
#
self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.tag, pipeline, self.segmentstracker)
self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.tag, pipeline, self.segmentstracker, self.latencytracker)
# FIXME: detangle this
self.eye_candy.lock = self.lock
......
......@@ -625,6 +625,7 @@ def mkLLOIDmulti(pipeline, detectors, banks, psd, psd_fft_length = 32, ht_gate_t
hoftdicts = {}
for instrument in detectors.channel_dict:
src, statevector, dqvector = datasource.mkbasicsrc(pipeline, detectors, instrument, verbose)
src = pipeparts.mklatency(pipeline, src, name = "%s_whitening_latency" % instrument, silent = True)
hoftdicts[instrument] = multirate_datasource.mkwhitened_multirate_src(
pipeline,
src = src,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment