diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py index cd093226604ac13d05a21df935618cdd3f53a43b..e01a7d8c7e6920dd788f339256050edd91b9fffb 100644 --- a/gstlal-inspiral/python/lloidhandler.py +++ b/gstlal-inspiral/python/lloidhandler.py @@ -260,6 +260,7 @@ class EyeCandy(object): t = inspiral.now() if self.time_since_last_state is None: self.time_since_last_state = t + # NOTE only dump to kafka every 1 seconds if self.producer is not None and (t - self.time_since_last_state) >= 1: self.time_since_last_state = t for ii, column in enumerate(["time", "data"]): @@ -284,9 +285,25 @@ class EyeCandy(object): self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t)) self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.) - # Send and flush all of the kafka messages and clear the data + # decimate the data in the other routes + for route in self.kafka_data.keys(): + if route == "coinc" or len(self.kafka_data[route]["data"]) == 0: + continue + if route == "likelihood_history" or route == "latency_history" or "snr_history" in route: + ix = numpy.argmax(self.kafka_data[route]["data"]) + self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]] + self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]] + if route == "far_history": + ix = numpy.argmin(self.kafka_data[route]["data"]) + self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]] + self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]] + + # Send all of the kafka messages and clear the data self.producer.send(self.tag, self.kafka_data) - self.producer.flush() + # This line forces the send but is blocking!! not the + # best idea for production running since we value + # latency over getting metric data out + #self.producer.flush() for route in self.kafka_data.keys(): self.kafka_data[route] = {'time': [], 'data': []} self.kafka_data["coinc"] = []