From edff9c610690146b6bd7ea37a3d1194b0c6ac324 Mon Sep 17 00:00:00 2001
From: "chad.hanna" <crh184@psu.edu>
Date: Thu, 22 Nov 2018 19:59:42 -0800
Subject: [PATCH] aggregator.py: update to new kafka data packing scheme

---
 gstlal-ugly/python/aggregator.py | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/gstlal-ugly/python/aggregator.py b/gstlal-ugly/python/aggregator.py
index 657ad89b2b..985916cb53 100644
--- a/gstlal-ugly/python/aggregator.py
+++ b/gstlal-ugly/python/aggregator.py
@@ -97,18 +97,18 @@ def get_data_from_kafka(jobs, routes, kafka_consumer, req_all = False, timeout =
 	bailout_time = now()
 	# FIXME this will block forever if it runs out of events unless
 	# consumer_timeout_ms (int) is set.  Should we let it block?
-	cnt = 0
+	maxt = 0
 	for message in kafka_consumer:
 		if message.topic not in jobs:
 			continue
-		# FIXME assumes json has only one key value pair
-		route, packet = message.value.items()[0]
-		if route not in routes:
-			continue
-		timedata[(message.topic, route)].append(float(packet.split()[0]))
-		datadata[(message.topic, route)].append(float(packet.split()[1]))
-		cnt += 1
-		if timedata[(message.topic, route)][-1] > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout):
+		for route in routes:
+			for x in message.value[route].split("\n"):
+				if x:
+					t,d = [float(y) for y in x.split()]
+					timedata[(message.topic, route)].append(t)
+					datadata[(message.topic, route)].append(d)
+					maxt = max(maxt, t)
+		if maxt > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout):
 			break
 	for k in timedata:
 		timedata[k] = numpy.array(timedata[k])
-- 
GitLab