Skip to content
Snippets Groups Projects
Commit 3db497a5 authored by Divya Singh's avatar Divya Singh Committed by Rebecca Ewing
Browse files

gstlal-inspiral/python/lloidhandler.py: adding topic_prefix to kafka topics...

gstlal-inspiral/python/lloidhandler.py: adding topic_prefix to kafka topics for separating inj anf noninj data streams

gstlal-inspiral/python/dags/layers/inspiral.py: separate inj and noninj topics in scald jobs
parent 33fcd657
No related branches found
No related tags found
1 merge request!209O4 Online
......@@ -1921,20 +1921,29 @@ def collect_metrics_layer(config, dag):
common_opts = [
Argument("command", "aggregate"),
Option("config", config.metrics.scald_config),
Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"),
]
# set topic_prefix to distinguish inj and noninj topics
topic_prefix = ['', 'inj_']
# define metrics used for aggregation jobs
snr_metrics = [f"{ifo}_snr_history" for ifo in config.ifos]
network_metrics = ["likelihood_history", "snr_history", "latency_history", "far_history"]
state_metrics = [f"{ifo}_strain_dropped" for ifo in config.ifos]
usage_metrics = ["ram_history"]
latency_metrics = [f"{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice")]
latency_metrics.append("all_itacac_latency")
snr_metrics = [f"{prefix}{ifo}_snr_history" for ifo in config.ifos for prefix in topic_prefix]
network_metrics = []
for prefix in topic_prefix:
network_metrics.append(f"{prefix}likelihood_history")
network_metrics.append(f"{prefix}snr_history")
network_metrics.append(f"{prefix}latency_history")
network_metrics.append(f"{prefix}far_history")
state_metrics = [f"{prefix}{ifo}_strain_dropped" for ifo in config.ifos for prefix in topic_prefix]
usage_metrics = [f"{prefix}ram_history" for prefix in topic_prefix]
latency_metrics = [f"{prefix}{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice") for prefix in topic_prefix]
for prefix in topic_prefix: latency_metrics.append(f"{prefix}all_itacac_latency")
agg_metrics = list(itertools.chain(snr_metrics, network_metrics, usage_metrics, state_metrics, latency_metrics))
gates = [f"{gate}segments" for gate in ("statevector", "dqvector", "whiteht")]
seg_metrics = [f"{ifo}_{gate}" for ifo in config.ifos for gate in gates]
seg_metrics = [f"{prefix}{ifo}_{gate}" for ifo in config.ifos for gate in gates for prefix in topic_prefix]
# set up partitioning
# FIXME don't hard code the 1000
......@@ -1951,11 +1960,9 @@ def collect_metrics_layer(config, dag):
# add jobs to consume metrics from non-injection jobs
arguments = list(common_opts)
arguments.extend([
Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"),
Option("data-type", "timeseries"),
Option("topic", [f"gstlal.{config.tag}.{metric}" for metric in metrics]),
Option("schema", metrics),
Option("tag", "noninj"),
])
# elect first metric collector as leader
......
......@@ -198,15 +198,15 @@ class EyeCandy(object):
if self.kafka_server is not None:
from ligo.scald.io import kafka
snr_routes = ["%s_snr_history" % ifo for ifo in instruments]
network_routes = ["likelihood_history", "snr_history", "latency_history"]
state_routes = ["%s_strain_dropped" % ifo for ifo in instruments]
usage_routes = ["ram_history"]
gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")]
seg_routes = ["%s_%s" % (ifo, gate) for ifo in instruments for gate in gates]
snr_routes = [f"{self.topic_prefix}{ifo}_snr_history" for ifo in instruments]
network_routes = [f"{self.topic_prefix}likelihood_history", f"{self.topic_prefix}snr_history", f"{self.topic_prefix}latency_history"]
state_routes = [f"{self.topic_prefix}{ifo}_strain_dropped" for ifo in instruments]
usage_routes = [f"{self.topic_prefix}ram_history"]
gates = [f"{self.topic_prefix}{gate}segments" for gate in ("statevector", "dqvector", "whiteht")]
seg_routes = [f"{self.topic_prefix}{ifo}_{gate}" for ifo in instruments for gate in gates]
instrument_latency_routes = ["%s_%s_latency" % (ifo, stage) for ifo in instruments for stage in ["datasource", "whitening", "snrSlice"]]
pipeline_latency_routes = ["all_%s_latency" % stage for stage in ["itacac"]]
instrument_latency_routes = [f"{self.topic_prefix}{ifo}_{stage}_latency" for ifo in instruments for stage in ["datasource", "whitening", "snrSlice"]]
pipeline_latency_routes = [f"{self.topic_prefix}_all_{stage}_latency" for stage in ["itacac"]]
topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, instrument_latency_routes, pipeline_latency_routes))
......@@ -234,8 +234,8 @@ class EyeCandy(object):
t, snr = float(event.end), event.snr
self.ifo_snr_history[ifo].append((t, snr))
if self.client is not None:
self.kafka_data["%s_snr_history" % ifo]["time"].append(t)
self.kafka_data["%s_snr_history" % ifo]["data"].append(snr)
self.kafka_data[f"{self.topic_prefix}{ifo}_snr_history"]["time"].append(t)
self.kafka_data[f"{self.topic_prefix}{ifo}_snr_history"]["data"].append(snr)
if last_coincs:
coinc_inspiral_index = last_coincs.coinc_inspiral_index
coinc_event_index = last_coincs.coinc_event_index
......@@ -282,14 +282,14 @@ class EyeCandy(object):
if self.client is not None:
for ii, column in enumerate(["time", "data"]):
self.kafka_data["latency_history"][column].append(float(self.latency_history[-1][ii]))
self.kafka_data["snr_history"][column].append(float(self.snr_history[-1][ii]))
self.kafka_data[f"{self.topic_prefix}latency_history"][column].append(float(self.latency_history[-1][ii]))
self.kafka_data[f"{self.topic_prefix}snr_history"][column].append(float(self.snr_history[-1][ii]))
if max_likelihood is not None:
self.kafka_data["likelihood_history"]["time"].append(float(max_likelihood_t))
self.kafka_data["likelihood_history"]["data"].append(float(max_likelihood))
self.kafka_data[f"{self.topic_prefix}likelihood_history"]["time"].append(float(max_likelihood_t))
self.kafka_data[f"{self.topic_prefix}likelihood_history"]["data"].append(float(max_likelihood))
if max_likelihood_far is not None:
self.kafka_data["far_history"]["time"].append(float(max_likelihood_t))
self.kafka_data["far_history"]["data"].append(float(max_likelihood_far))
self.kafka_data[f"{self.topic_prefix}far_history"]["time"].append(float(max_likelihood_t))
self.kafka_data[f"{self.topic_prefix}far_history"]["data"].append(float(max_likelihood_far))
t = inspiral.now()
if self.time_since_last_state is None:
......@@ -299,7 +299,7 @@ class EyeCandy(object):
if self.client is not None and (t - self.time_since_last_state) >= 1:
self.time_since_last_state = t
for ii, column in enumerate(["time", "data"]):
self.kafka_data["ram_history"][column].append(float(self.ram_history[-1][ii]))
self.kafka_data[f"{self.topic_prefix}ram_history"][column].append(float(self.ram_history[-1][ii]))
# collect gate segments
for gate in self.gate_history.keys():
......@@ -317,8 +317,8 @@ class EyeCandy(object):
# regularly sample from on/off points
gate_times = numpy.arange(int(self.time_since_last_state), int(t + 1), 0.25)
gate_onoff = interp1d(gate_interp_times, gate_interp_onoff, kind='zero')(gate_times)
self.kafka_data["%s_%s" % (instrument, gate)]["time"].extend([t for t in gate_times if t >= self.time_since_last_state])
self.kafka_data["%s_%s" % (instrument, gate)]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state])
self.kafka_data[f"{self.topic_prefix}{instrument}_{gate}"]["time"].extend([t for t in gate_times if t >= self.time_since_last_state])
self.kafka_data[f"{self.topic_prefix}{instrument}_{gate}"]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state])
# collect any new latencies
if self.pipeline_latency_history:
......@@ -326,8 +326,8 @@ class EyeCandy(object):
for instrument in self.pipeline_latency_history[stage].keys():
while (self.pipeline_latency_history[stage][instrument]):
data = self.pipeline_latency_history[stage][instrument].pop()
self.kafka_data["%s_%s_latency" % (instrument, stage)]["time"].append(data[0])
self.kafka_data["%s_%s_latency" % (instrument, stage)]["data"].append(data[1])
self.kafka_data[f"{self.topic_prefix}{instrument}_{stage}_latency"]["time"].append(data[0])
self.kafka_data[f"{self.topic_prefix}{instrument}_{stage}_latency"]["data"].append(data[1])
# collect strain dropped samples
for instrument, elem in self.strain.items():
......@@ -336,8 +336,8 @@ class EyeCandy(object):
# because audiorate has to "add" samples when
# data is dropped.
# FIXME don't hard code the rate
self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t))
self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.)
self.kafka_data[f"{self.topic_prefix}{instrument}_strain_dropped"]["time"].append(float(t))
self.kafka_data[f"{self.topic_prefix}{instrument}_strain_dropped"]["data"].append(elem.get_property("add") / 16384.)
# Send all of the kafka messages and clear the data
#self.producer.send(self.tag, self.kafka_data)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment