From aa29b94ac97609df95b9efdd2b34fed52a8148f2 Mon Sep 17 00:00:00 2001
From: "chad.hanna" <crh184@psu.edu>
Date: Thu, 22 Nov 2018 19:58:07 -0800
Subject: [PATCH] lloidhander: make fewer kafka messages

---
 gstlal-inspiral/python/lloidhandler.py | 51 +++++++++++++++++++-------
 1 file changed, 37 insertions(+), 14 deletions(-)

diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py
index 8424e957d5..51e485333a 100644
--- a/gstlal-inspiral/python/lloidhandler.py
+++ b/gstlal-inspiral/python/lloidhandler.py
@@ -187,13 +187,34 @@ class EyeCandy(object):
 		else:
 			self.producer = None
 
+		# FIXME, it is silly to store kafka data like this since we
+		# have all the other data structures, but since we are also
+		# maintaining the bottle route methods, we should keep this a
+		# bit separate for now to not disrupt too much.
+
+		self.kafka_data = {}
+		self.kafka_data["latency_history"] = ""
+		self.kafka_data["likelihood_history"] = ""
+		self.kafka_data["snr_history"] = ""
+		self.kafka_data["far_history"] = ""
+		self.kafka_data["ram_history"] = ""
+		for instrument in instruments:
+			self.kafka_data["%s_snr_history" % instrument] = ""
+			self.kafka_data["%s/dqvector_on" % instrument] = ""
+			self.kafka_data["%s/dqvector_off" % instrument] = ""
+			self.kafka_data["%s/dqvector_gap" % instrument] = ""
+			self.kafka_data["%s/statevector_on" % instrument] = ""
+			self.kafka_data["%s/statevector_off" % instrument] = ""
+			self.kafka_data["%s/statevector_gap" % instrument] = ""
+			self.kafka_data["%s/strain_dropped" % instrument] = ""
+
 	def update(self, events, last_coincs):
 		self.ram_history.append((float(lal.UTCToGPS(time.gmtime())), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB
 		if events:
 			max_snr_event = max(events, key = lambda event: event.snr)
 			self.ifo_snr_history[max_snr_event.ifo].append((float(max_snr_event.end), max_snr_event.snr))
 			if self.producer is not None:
-				self.producer.send(self.tag, {"%s_snr_history" % max_snr_event.ifo: "%s\t%s" % (self.ifo_snr_history[max_snr_event.ifo][-1][0], self.ifo_snr_history[max_snr_event.ifo][-1][1])})
+				self.kafka_data["%s_snr_history" % max_snr_event.ifo] += "%s\t%s\n" % (self.ifo_snr_history[max_snr_event.ifo][-1][0], self.ifo_snr_history[max_snr_event.ifo][-1][1])
 		if last_coincs:
 			coinc_inspiral_index = last_coincs.coinc_inspiral_index
 			coinc_event_index = last_coincs.coinc_event_index
@@ -214,36 +235,38 @@ class EyeCandy(object):
 			self.far_history.append((max_likelihood_t, max_likelihood_far))
 
 			if self.producer is not None:
-				self.producer.send(self.tag, {"latency_history": "%s\t%s" % (self.latency_history[-1][0], self.latency_history[-1][1])})
-				self.producer.send(self.tag, {"snr_history": "%s\t%s" % (self.snr_history[-1][0], self.snr_history[-1][1])})
-				self.producer.send(self.tag, {"likelihood_history": "%s\t%s" % (self.likelihood_history[-1][0], self.likelihood_history[-1][1])})
-				self.producer.send(self.tag, {"far_history": "%s\t%s" % (self.far_history[-1][0], self.far_history[-1][1])})
+				self.kafka_data["latency_history"] += "%s\t%s\n" % (self.latency_history[-1][0], self.latency_history[-1][1])
+				self.kafka_data["snr_history"] += "%s\t%s\n" % (self.snr_history[-1][0], self.snr_history[-1][1])
+				self.kafka_data["likelihood_history"] += "%s\t%s\n" % (self.likelihood_history[-1][0], self.likelihood_history[-1][1])
+				self.kafka_data["far_history"] += "%s\t%s\n" % (self.far_history[-1][0], self.far_history[-1][1])
 
 		t = inspiral.now()
 		if self.time_since_last_state is None:
 			self.time_since_last_state = t
 		if self.producer is not None and (t - self.time_since_last_state) > 1:
 			self.time_since_last_state = t
-			self.producer.send(self.tag, {"ram_history": "%s\t%s" % (self.ram_history[-1][0], self.ram_history[-1][1])})
+			self.kafka_data["ram_history"] += "%s\t%s" % (self.ram_history[-1][0], self.ram_history[-1][1])
 			# send the state vector and dq vector information to kafka
 			for instrument, elem in self.dqvectors.items():
-				self.producer.send(self.tag, {"%s/dqvector_on" % instrument: "%s\t%s" % (t, elem.get_property("on-samples"))})
-				self.producer.send(self.tag, {"%s/dqvector_off" % instrument: "%s\t%s" % (t, elem.get_property("off-samples"))})
-				self.producer.send(self.tag, {"%s/dqvector_gap" % instrument: "%s\t%s" % (t, elem.get_property("gap-samples"))})
+				self.kafka_data["%s/dqvector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
+				self.kafka_data["%s/dqvector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
+				self.kafka_data["%s/dqvector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
 			for instrument, elem in self.statevectors.items():
-				self.producer.send(self.tag, {"%s/statevector_on" % instrument: "%s\t%s" % (t, elem.get_property("on-samples"))})
-				self.producer.send(self.tag, {"%s/statevector_off" % instrument: "%s\t%s" % (t, elem.get_property("off-samples"))})
-				self.producer.send(self.tag, {"%s/statevector_gap" % instrument: "%s\t%s" % (t, elem.get_property("gap-samples"))})
+				self.kafka_data["%s/statevector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
+				self.kafka_data["%s/statevector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
+				self.kafka_data["%s/statevector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
 			for instrument, elem in self.strain.items():
 				# I know the name is strain_drop even though it
 				# comes from the "add" property. that is
 				# because audiorate has to "add" samples when
 				# data is dropped.
 				# FIXME don't hard code the rate
-				self.producer.send(self.tag, {"%s/strain_dropped" % instrument: "%s\t%s" % (t, elem.get_property("add") / 16384.)})
+				self.kafka_data["%s/strain_dropped" % instrument] += "%s\t%s" % (t, elem.get_property("add") / 16384.)
 
-			# Actually flush all of the kafka messages
+			# Send and flush all of the kafka messages and clear the data
+			self.producer.send(self.tag, self.kafka_data)
 			self.producer.flush()
+			for k in self.kafka_data: self.kafka_data[k] = ""
 
 	def web_get_latency_histogram(self):
 		with self.lock:
-- 
GitLab