diff --git a/gstlal-inspiral/python/dags/layers/inspiral.py b/gstlal-inspiral/python/dags/layers/inspiral.py index 10b3f0352d60196a48ac1bbf5d1571d378b00120..8e052feba8b07dce3794d2c9522dd4ad972d769f 100644 --- a/gstlal-inspiral/python/dags/layers/inspiral.py +++ b/gstlal-inspiral/python/dags/layers/inspiral.py @@ -1921,20 +1921,29 @@ def collect_metrics_layer(config, dag): common_opts = [ Argument("command", "aggregate"), Option("config", config.metrics.scald_config), + Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"), ] + # set topic_prefix to distinguish inj and noninj topics + topic_prefix = ['', 'inj_'] + # define metrics used for aggregation jobs - snr_metrics = [f"{ifo}_snr_history" for ifo in config.ifos] - network_metrics = ["likelihood_history", "snr_history", "latency_history", "far_history"] - state_metrics = [f"{ifo}_strain_dropped" for ifo in config.ifos] - usage_metrics = ["ram_history"] - latency_metrics = [f"{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice")] - latency_metrics.append("all_itacac_latency") + snr_metrics = [f"{prefix}{ifo}_snr_history" for ifo in config.ifos for prefix in topic_prefix] + network_metrics = [] + for prefix in topic_prefix: + network_metrics.append(f"{prefix}likelihood_history") + network_metrics.append(f"{prefix}snr_history") + network_metrics.append(f"{prefix}latency_history") + network_metrics.append(f"{prefix}far_history") + state_metrics = [f"{prefix}{ifo}_strain_dropped" for ifo in config.ifos for prefix in topic_prefix] + usage_metrics = [f"{prefix}ram_history" for prefix in topic_prefix] + latency_metrics = [f"{prefix}{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice") for prefix in topic_prefix] + for prefix in topic_prefix: latency_metrics.append(f"{prefix}all_itacac_latency") agg_metrics = list(itertools.chain(snr_metrics, network_metrics, usage_metrics, state_metrics, latency_metrics)) gates = [f"{gate}segments" for gate in ("statevector", "dqvector", "whiteht")] - seg_metrics = [f"{ifo}_{gate}" for ifo in config.ifos for gate in gates] + seg_metrics = [f"{prefix}{ifo}_{gate}" for ifo in config.ifos for gate in gates for prefix in topic_prefix] # set up partitioning # FIXME don't hard code the 1000 @@ -1951,11 +1960,9 @@ def collect_metrics_layer(config, dag): # add jobs to consume metrics from non-injection jobs arguments = list(common_opts) arguments.extend([ - Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"), Option("data-type", "timeseries"), Option("topic", [f"gstlal.{config.tag}.{metric}" for metric in metrics]), Option("schema", metrics), - Option("tag", "noninj"), ]) # elect first metric collector as leader diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py index 65cb3322068c0b771886051f4b5db3adc0aa0263..dc5cf42a3bddc39caf91b3affe86361d3113e5fc 100644 --- a/gstlal-inspiral/python/lloidhandler.py +++ b/gstlal-inspiral/python/lloidhandler.py @@ -198,15 +198,15 @@ class EyeCandy(object): if self.kafka_server is not None: from ligo.scald.io import kafka - snr_routes = ["%s_snr_history" % ifo for ifo in instruments] - network_routes = ["likelihood_history", "snr_history", "latency_history"] - state_routes = ["%s_strain_dropped" % ifo for ifo in instruments] - usage_routes = ["ram_history"] - gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")] - seg_routes = ["%s_%s" % (ifo, gate) for ifo in instruments for gate in gates] + snr_routes = [f"{self.topic_prefix}{ifo}_snr_history" for ifo in instruments] + network_routes = [f"{self.topic_prefix}likelihood_history", f"{self.topic_prefix}snr_history", f"{self.topic_prefix}latency_history"] + state_routes = [f"{self.topic_prefix}{ifo}_strain_dropped" for ifo in instruments] + usage_routes = [f"{self.topic_prefix}ram_history"] + gates = [f"{self.topic_prefix}{gate}segments" for gate in ("statevector", "dqvector", "whiteht")] + seg_routes = [f"{self.topic_prefix}{ifo}_{gate}" for ifo in instruments for gate in gates] - instrument_latency_routes = ["%s_%s_latency" % (ifo, stage) for ifo in instruments for stage in ["datasource", "whitening", "snrSlice"]] - pipeline_latency_routes = ["all_%s_latency" % stage for stage in ["itacac"]] + instrument_latency_routes = [f"{self.topic_prefix}{ifo}_{stage}_latency" for ifo in instruments for stage in ["datasource", "whitening", "snrSlice"]] + pipeline_latency_routes = [f"{self.topic_prefix}_all_{stage}_latency" for stage in ["itacac"]] topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, instrument_latency_routes, pipeline_latency_routes)) @@ -234,8 +234,8 @@ class EyeCandy(object): t, snr = float(event.end), event.snr self.ifo_snr_history[ifo].append((t, snr)) if self.client is not None: - self.kafka_data["%s_snr_history" % ifo]["time"].append(t) - self.kafka_data["%s_snr_history" % ifo]["data"].append(snr) + self.kafka_data[f"{self.topic_prefix}{ifo}_snr_history"]["time"].append(t) + self.kafka_data[f"{self.topic_prefix}{ifo}_snr_history"]["data"].append(snr) if last_coincs: coinc_inspiral_index = last_coincs.coinc_inspiral_index coinc_event_index = last_coincs.coinc_event_index @@ -282,14 +282,14 @@ class EyeCandy(object): if self.client is not None: for ii, column in enumerate(["time", "data"]): - self.kafka_data["latency_history"][column].append(float(self.latency_history[-1][ii])) - self.kafka_data["snr_history"][column].append(float(self.snr_history[-1][ii])) + self.kafka_data[f"{self.topic_prefix}latency_history"][column].append(float(self.latency_history[-1][ii])) + self.kafka_data[f"{self.topic_prefix}snr_history"][column].append(float(self.snr_history[-1][ii])) if max_likelihood is not None: - self.kafka_data["likelihood_history"]["time"].append(float(max_likelihood_t)) - self.kafka_data["likelihood_history"]["data"].append(float(max_likelihood)) + self.kafka_data[f"{self.topic_prefix}likelihood_history"]["time"].append(float(max_likelihood_t)) + self.kafka_data[f"{self.topic_prefix}likelihood_history"]["data"].append(float(max_likelihood)) if max_likelihood_far is not None: - self.kafka_data["far_history"]["time"].append(float(max_likelihood_t)) - self.kafka_data["far_history"]["data"].append(float(max_likelihood_far)) + self.kafka_data[f"{self.topic_prefix}far_history"]["time"].append(float(max_likelihood_t)) + self.kafka_data[f"{self.topic_prefix}far_history"]["data"].append(float(max_likelihood_far)) t = inspiral.now() if self.time_since_last_state is None: @@ -299,7 +299,7 @@ class EyeCandy(object): if self.client is not None and (t - self.time_since_last_state) >= 1: self.time_since_last_state = t for ii, column in enumerate(["time", "data"]): - self.kafka_data["ram_history"][column].append(float(self.ram_history[-1][ii])) + self.kafka_data[f"{self.topic_prefix}ram_history"][column].append(float(self.ram_history[-1][ii])) # collect gate segments for gate in self.gate_history.keys(): @@ -317,8 +317,8 @@ class EyeCandy(object): # regularly sample from on/off points gate_times = numpy.arange(int(self.time_since_last_state), int(t + 1), 0.25) gate_onoff = interp1d(gate_interp_times, gate_interp_onoff, kind='zero')(gate_times) - self.kafka_data["%s_%s" % (instrument, gate)]["time"].extend([t for t in gate_times if t >= self.time_since_last_state]) - self.kafka_data["%s_%s" % (instrument, gate)]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state]) + self.kafka_data[f"{self.topic_prefix}{instrument}_{gate}"]["time"].extend([t for t in gate_times if t >= self.time_since_last_state]) + self.kafka_data[f"{self.topic_prefix}{instrument}_{gate}"]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state]) # collect any new latencies if self.pipeline_latency_history: @@ -326,8 +326,8 @@ class EyeCandy(object): for instrument in self.pipeline_latency_history[stage].keys(): while (self.pipeline_latency_history[stage][instrument]): data = self.pipeline_latency_history[stage][instrument].pop() - self.kafka_data["%s_%s_latency" % (instrument, stage)]["time"].append(data[0]) - self.kafka_data["%s_%s_latency" % (instrument, stage)]["data"].append(data[1]) + self.kafka_data[f"{self.topic_prefix}{instrument}_{stage}_latency"]["time"].append(data[0]) + self.kafka_data[f"{self.topic_prefix}{instrument}_{stage}_latency"]["data"].append(data[1]) # collect strain dropped samples for instrument, elem in self.strain.items(): @@ -336,8 +336,8 @@ class EyeCandy(object): # because audiorate has to "add" samples when # data is dropped. # FIXME don't hard code the rate - self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t)) - self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.) + self.kafka_data[f"{self.topic_prefix}{instrument}_strain_dropped"]["time"].append(float(t)) + self.kafka_data[f"{self.topic_prefix}{instrument}_strain_dropped"]["data"].append(elem.get_property("add") / 16384.) # Send all of the kafka messages and clear the data #self.producer.send(self.tag, self.kafka_data)