diff --git a/.gitignore b/.gitignore index 5ff5c0c8ac439576da7aeeb79c5259e39a88e028..4f3e09f93f9639c9289e69bdb054a05b75aa2331 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ configure */tests/*.log */tests/*.trs libtool +.vscode diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py index b66f149101ec9de87d97f971257fc49753a53cd3..12b1ffbfe5823064a007c824ee649d5bda62d587 100644 --- a/gstlal-inspiral/python/lloidhandler.py +++ b/gstlal-inspiral/python/lloidhandler.py @@ -132,10 +132,11 @@ def subdir_from_T050017_filename(fname): class EyeCandy(object): - def __init__(self, instruments, kafka_server, tag, pipeline, segmentstracker): + def __init__(self, instruments, kafka_server, tag, pipeline, segmentstracker, latencytracker): self.kafka_server = kafka_server self.tag = tag self.gate_history = segmentstracker.gate_history + self.pipeline_latency_history = latencytracker.pipeline_history self.latency_histogram = rate.BinnedArray(rate.NDBins((rate.LinearPlusOverflowBins(5, 205, 22),))) # NOTE most of this data is collected at 1Hz, thus a 300 # element deque should hold about 5 minutes of history. @@ -188,7 +189,9 @@ class EyeCandy(object): gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")] seg_routes = ["%s_%s" % (ifo, gate) for ifo in instruments for gate in gates] - topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes)) + pipeline_latency_routes = ["%s_whitening_latency" % ifo for ifo in instruments] + + topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, pipeline_latency_routes)) self.client = kafka.Client("kafka://{}".format(self.kafka_server)) self.client.subscribe(topics) @@ -297,6 +300,14 @@ class EyeCandy(object): gate_onoff = interp1d(gate_interp_times, gate_interp_onoff, kind='zero')(gate_times) self.kafka_data["%s_%s" % (instrument, gate)]["time"].extend([t for t in gate_times if t >= self.time_since_last_state]) self.kafka_data["%s_%s" % (instrument, gate)]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state]) + + # collect any new latencies + for stage in self.pipeline_latency_history.keys(): + for instrument in self.pipeline_latency_history[stage].keys(): + while (self.pipeline_latency_history[stage][instrument]): + data = self.pipeline_latency_history[stage][instrument].pop() + self.kafka_data["%s_%s_latency" % (instrument, stage)]["time"].append(data[0]) + self.kafka_data["%s_%s_latency" % (instrument, stage)]["data"].append(data[1]) # collect strain dropped samples for instrument, elem in self.strain.items(): @@ -639,6 +650,76 @@ class SegmentsTracker(object): output.close() return outstr +# +# ============================================================================= +# +# Pipeline Latency Tracker +# +# ============================================================================= +# + +class LatencyTracker(object): + def __init__(self, pipeline, instruments, verbose = False): + self.lock = threading.Lock() + self.verbose = verbose + + # recent pipeline stage latencies + pipeline_stages = ["whitening"] + self.pipeline_history = {stage: {instrument: deque(maxlen = 20) for instrument in instruments} for stage in pipeline_stages} + + # iterate over pipeline stages, look for the + # stage element that should provide the latencies, and + # connect handlers to collect the latencies + if verbose: + print(sys.stderr, "connecting pipeline latency handlers ...", file=sys.stderr) + for stage in self.pipeline_history.keys(): + for instrument in self.pipeline_history[stage].keys(): + elem = pipeline.get_by_name("%s_%s_latency" % (instrument, stage)) + if elem is None: + if verbose: + print("\tcould not find %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr) + continue + if verbose: + print("\tfound %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr) + elem.connect("notify::current-latency", self.latencyhandler, (stage, instrument, "stop")) + if verbose: + print("... done connecting pipeline latency handlers", file=sys.stderr) + + + def __latencyhandler(self, elem, timestamp, stage_latency_input): + """! + A handler that intercepts pipeline latency calls. + + @param elem A reference to the lal_gate element or None + (only used for verbosity) + @param timestamp A gstreamer time stamp (integer + nanoseconds) that marks the state transition + @param stage_latency_input name, instrument and state of the stage + from the pipeline + + Must be called with the lock held. + """ + # parse stage_latency_input + stage, instrument, state = stage_latency_input + + # parse properties from element trigger + latency = elem.get_property("current-latency") + name = elem.get_property("name") if elem is not None else "<internal>" + timestamp = elem.get_property("timestamp") + + if self.verbose: + print("%s: %s trigger @ %s with latency %s" % (name, state, str(timestamp), str(latency)), file= sys.stderr) + + if state == "stop": + # record pipeline stage latency + self.pipeline_history[stage][instrument].append((float(timestamp), float(latency))) + else: + assert False, "impossible state '%s'" % state + + + def latencyhandler(self, elem, timestamp, stage_latency_input): + with self.lock: + self.__latencyhandler(elem, timestamp, stage_latency_input) # # ============================================================================= @@ -698,11 +779,17 @@ class Handler(simplehandler.Handler): self.segmentstracker = SegmentsTracker(pipeline, rankingstat.instruments, verbose = verbose) + # + # setup latency collection from pipeline + # + + self.latencytracker = LatencyTracker(pipeline, rankingstat.instruments, verbose = verbose) + # # set up metric collection # - self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.tag, pipeline, self.segmentstracker) + self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.tag, pipeline, self.segmentstracker, self.latencytracker) # FIXME: detangle this self.eye_candy.lock = self.lock diff --git a/gstlal-inspiral/python/lloidparts.py b/gstlal-inspiral/python/lloidparts.py index 504615a876cac7824a93111a9d1fa0a8bf6b7e4e..0ca8214bed9bc97b8cff28ae2c0ad1e39584f4b2 100644 --- a/gstlal-inspiral/python/lloidparts.py +++ b/gstlal-inspiral/python/lloidparts.py @@ -625,6 +625,7 @@ def mkLLOIDmulti(pipeline, detectors, banks, psd, psd_fft_length = 32, ht_gate_t hoftdicts = {} for instrument in detectors.channel_dict: src, statevector, dqvector = datasource.mkbasicsrc(pipeline, detectors, instrument, verbose) + src = pipeparts.mklatency(pipeline, src, name = "%s_whitening_latency" % instrument, silent = True) hoftdicts[instrument] = multirate_datasource.mkwhitened_multirate_src( pipeline, src = src,