From 99b0cfb846515d789c5d02aa1584723c78ea1f83 Mon Sep 17 00:00:00 2001
From: "chad.hanna" <crh184@psu.edu>
Date: Wed, 20 Mar 2019 17:38:32 -0700
Subject: [PATCH] lloidhandler: reduce some of the data going to kafka

---
 gstlal-inspiral/python/lloidhandler.py | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py
index cd09322660..e01a7d8c7e 100644
--- a/gstlal-inspiral/python/lloidhandler.py
+++ b/gstlal-inspiral/python/lloidhandler.py
@@ -260,6 +260,7 @@ class EyeCandy(object):
 		t = inspiral.now()
 		if self.time_since_last_state is None:
 			self.time_since_last_state = t
+		# NOTE only dump to kafka every 1 seconds
 		if self.producer is not None and (t - self.time_since_last_state) >= 1:
 			self.time_since_last_state = t
 			for ii, column in enumerate(["time", "data"]):
@@ -284,9 +285,25 @@ class EyeCandy(object):
 				self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t))
 				self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.)
 
-			# Send and flush all of the kafka messages and clear the data
+			# decimate the data in the other routes
+			for route in self.kafka_data.keys():
+				if route == "coinc" or len(self.kafka_data[route]["data"]) == 0:
+					continue
+				if route == "likelihood_history" or route == "latency_history" or "snr_history" in route:
+					ix = numpy.argmax(self.kafka_data[route]["data"])
+					self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
+					self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
+				if route == "far_history":
+					ix = numpy.argmin(self.kafka_data[route]["data"])
+					self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
+					self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
+
+			# Send all of the kafka messages and clear the data
 			self.producer.send(self.tag, self.kafka_data)
-			self.producer.flush()
+			# This line forces the send but is blocking!! not the
+			# best idea for production running since we value
+			# latency over getting metric data out
+			#self.producer.flush()
 			for route in self.kafka_data.keys():
 				self.kafka_data[route] = {'time': [], 'data': []}
 			self.kafka_data["coinc"] = []
-- 
GitLab