From 68b0a00a626434c4ae908f7eb6be4c9d41ac83fd Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Sun, 10 Feb 2019 16:02:37 -0800
Subject: [PATCH] lloidhandler.py: change structure of data stored and pushed
 to kafka within EyeCandy, change name of state routes, add logic to handle
 sampling rates of state info here rather than downstream

---
 gstlal-inspiral/python/lloidhandler.py | 61 ++++++++++++--------------
 1 file changed, 28 insertions(+), 33 deletions(-)

diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py
index 1cb4b738ce..5efbbe2d1d 100644
--- a/gstlal-inspiral/python/lloidhandler.py
+++ b/gstlal-inspiral/python/lloidhandler.py
@@ -46,7 +46,7 @@
 #
 
 
-from collections import deque
+from collections import defaultdict, deque
 try:
 	from fpconst import NaN
 	from fpconst import PosInf
@@ -145,6 +145,7 @@ class EyeCandy(object):
 		self.far_history = deque(maxlen = 300)
 		self.ram_history = deque(maxlen = 2)
 		self.ifo_snr_history = dict((instrument, deque(maxlen = 300)) for instrument in instruments)
+		self.state_sample_rates = {"H1": 16, "L1": 16, "V1": 1}
 		self.dqvectors = {}
 		self.statevectors = {}
 		self.strain = {}
@@ -183,7 +184,7 @@ class EyeCandy(object):
 
 		if self.kafka_server is not None:
 			from kafka import KafkaProducer
-			self.producer = KafkaProducer(bootstrap_servers=[self.kafka_server], value_serializer=lambda m: json.dumps(m).encode('ascii'))
+			self.producer = KafkaProducer(bootstrap_servers=[self.kafka_server], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
 		else:
 			self.producer = None
 
@@ -192,22 +193,8 @@ class EyeCandy(object):
 		# maintaining the bottle route methods, we should keep this a
 		# bit separate for now to not disrupt too much.
 
-		self.kafka_data = {}
-		self.kafka_data["latency_history"] = ""
-		self.kafka_data["likelihood_history"] = ""
-		self.kafka_data["snr_history"] = ""
-		self.kafka_data["far_history"] = ""
-		self.kafka_data["ram_history"] = ""
-		self.kafka_data["coinc"] = ""
-		for instrument in instruments:
-			self.kafka_data["%s_snr_history" % instrument] = ""
-			self.kafka_data["%s/dqvector_on" % instrument] = ""
-			self.kafka_data["%s/dqvector_off" % instrument] = ""
-			self.kafka_data["%s/dqvector_gap" % instrument] = ""
-			self.kafka_data["%s/statevector_on" % instrument] = ""
-			self.kafka_data["%s/statevector_off" % instrument] = ""
-			self.kafka_data["%s/statevector_gap" % instrument] = ""
-			self.kafka_data["%s/strain_dropped" % instrument] = ""
+		self.kafka_data = defaultdict(lambda: {'time': [], 'data': []})
+		self.kafka_data["coinc"] = []
 
 	def update(self, events, last_coincs):
 		self.ram_history.append((float(lal.UTCToGPS(time.gmtime())), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB
@@ -215,7 +202,8 @@ class EyeCandy(object):
 			max_snr_event = max(events, key = lambda event: event.snr)
 			self.ifo_snr_history[max_snr_event.ifo].append((float(max_snr_event.end), max_snr_event.snr))
 			if self.producer is not None:
-				self.kafka_data["%s_snr_history" % max_snr_event.ifo] += "%s\t%s\n" % (self.ifo_snr_history[max_snr_event.ifo][-1][0], self.ifo_snr_history[max_snr_event.ifo][-1][1])
+				for ii, column in enumerate(["time", "data"]):
+					self.kafka_data["%s_snr_history" % max_snr_event.ifo][column].append(float(self.ifo_snr_history[max_snr_event.ifo][-1][ii]))
 		if last_coincs:
 			coinc_inspiral_index = last_coincs.coinc_inspiral_index
 			coinc_event_index = last_coincs.coinc_event_index
@@ -239,7 +227,7 @@ class EyeCandy(object):
 						coinc_dict["%s_%s" % (sngl_row.ifo, attr)] = float(getattr(sngl_row, attr))
 					coinc_dict["%s_end" % sngl_row.ifo] = float(sngl_row.end)
 				coinc_dict_list.append(coinc_dict)
-			self.kafka_data["coinc"] += json.dumps(coinc_dict_list)
+			self.kafka_data["coinc"].extend(coinc_dict_list)
 			for coinc_inspiral in coinc_inspiral_index.values():
 				# latency in .minimum_duration
 				# FIXME:  update when a proper column is available
@@ -257,38 +245,45 @@ class EyeCandy(object):
 			self.far_history.append((max_likelihood_t, max_likelihood_far))
 
 			if self.producer is not None:
-				self.kafka_data["latency_history"] += "%s\t%s\n" % (self.latency_history[-1][0], self.latency_history[-1][1])
-				self.kafka_data["snr_history"] += "%s\t%s\n" % (self.snr_history[-1][0], self.snr_history[-1][1])
-				self.kafka_data["likelihood_history"] += "%s\t%s\n" % (self.likelihood_history[-1][0], self.likelihood_history[-1][1])
-				self.kafka_data["far_history"] += "%s\t%s\n" % (self.far_history[-1][0], self.far_history[-1][1])
+				for ii, column in enumerate(["time", "data"]):
+					self.kafka_data["latency_history"][column].append(float(self.latency_history[-1][ii]))
+					self.kafka_data["snr_history"][column].append(float(self.snr_history[-1][ii]))
+					self.kafka_data["likelihood_history"][column].append(float(self.likelihood_history[-1][ii]))
+					self.kafka_data["far_history"][column].append(float(self.far_history[-1][ii]))
 
 		t = inspiral.now()
 		if self.time_since_last_state is None:
 			self.time_since_last_state = t
 		if self.producer is not None and (t - self.time_since_last_state) > 1:
 			self.time_since_last_state = t
-			self.kafka_data["ram_history"] += "%s\t%s" % (self.ram_history[-1][0], self.ram_history[-1][1])
+			for ii, column in enumerate(["time", "data"]):
+				self.kafka_data["ram_history"][column].append(float(self.ram_history[-1][ii]))
+
 			# send the state vector and dq vector information to kafka
+			# FIXME state sample rate hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo
 			for instrument, elem in self.dqvectors.items():
-				self.kafka_data["%s/dqvector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
-				self.kafka_data["%s/dqvector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
-				self.kafka_data["%s/dqvector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
+				for state in ["on", "off", "gap"]:
+					self.kafka_data["%s_dqvector_%s" % (instrument, state)]["time"].append(float(t))
+					self.kafka_data["%s_dqvector_%s" % (instrument, state)]["data"].append(elem.get_property("%s-samples" % state) / self.state_sample_rates[instrument])
 			for instrument, elem in self.statevectors.items():
-				self.kafka_data["%s/statevector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
-				self.kafka_data["%s/statevector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
-				self.kafka_data["%s/statevector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
+				for state in ["on", "off", "gap"]:
+					self.kafka_data["%s_statevector_%s" % (instrument, state)]["time"].append(float(t))
+					self.kafka_data["%s_statevector_%s" % (instrument, state)]["data"].append(elem.get_property("%s-samples" % state) / self.state_sample_rates[instrument])
 			for instrument, elem in self.strain.items():
 				# I know the name is strain_drop even though it
 				# comes from the "add" property. that is
 				# because audiorate has to "add" samples when
 				# data is dropped.
 				# FIXME don't hard code the rate
-				self.kafka_data["%s/strain_dropped" % instrument] += "%s\t%s" % (t, elem.get_property("add") / 16384.)
+				self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t))
+				self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.)
 
 			# Send and flush all of the kafka messages and clear the data
 			self.producer.send(self.tag, self.kafka_data)
 			self.producer.flush()
-			for k in self.kafka_data: self.kafka_data[k] = ""
+			for route in self.kafka_data.keys():
+				self.kafka_data[route] = {'time': [], 'data': []}
+			self.kafka_data["coinc"] = []
 
 	def web_get_latency_histogram(self):
 		with self.lock:
-- 
GitLab