diff --git a/gstlal-inspiral/python/inspiral_pipe.py b/gstlal-inspiral/python/inspiral_pipe.py index e88807254b3b80aedd90c7cab26f69c7e22ec7e8..741db6a739dafa9decc10f7835971933d28cc34e 100644 --- a/gstlal-inspiral/python/inspiral_pipe.py +++ b/gstlal-inspiral/python/inspiral_pipe.py @@ -262,7 +262,10 @@ def aggregator_layer(dag, jobs, options, job_tags): state_routes = ["%s_strain_dropped" % ifo for ifo in options.channel_dict] usage_routes = ["ram_history"] - agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes)) + instrument_latency_routes = ["%s_%s_latency" % (ifo, stage) for ifo in options.channel_dict for stage in ["datasource", "whitening", "snrSlice"]] + pipeline_latency_routes = ["all_%s_latency" % stage for stage in ["itacac"]] + + agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, instrument_latency_routes, pipeline_latency_routes)) gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")] seg_routes = ["%s_%s" % (ifo, gate) for ifo in options.channel_dict for gate in gates] diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py index f4ce9dc280773290acc64a75d8762d8d3232e2cb..34266db4dcc9c90347cc10a35ddf07a106c20d84 100644 --- a/gstlal-inspiral/python/lloidhandler.py +++ b/gstlal-inspiral/python/lloidhandler.py @@ -189,9 +189,10 @@ class EyeCandy(object): gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")] seg_routes = ["%s_%s" % (ifo, gate) for ifo in instruments for gate in gates] - pipeline_latency_routes = ["%s_whitening_latency" % ifo for ifo in instruments] + instrument_latency_routes = ["%s_%s_latency" % (ifo, stage) for ifo in instruments for stage in ["datasource", "whitening", "snrSlice"]] + pipeline_latency_routes = ["all_%s_latency" % stage for stage in ["itacac"]] - topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, pipeline_latency_routes)) + topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, instrument_latency_routes, pipeline_latency_routes)) self.client = kafka.Client("kafka://{}".format(self.kafka_server)) self.client.subscribe(topics)