diff --git a/gstlal-ugly/python/aggregator.py b/gstlal-ugly/python/aggregator.py index 657ad89b2bccd6ad959fb4bf0b68f81e07bb0bf0..985916cb53c30fb533c18c94d1f302527d725b43 100644 --- a/gstlal-ugly/python/aggregator.py +++ b/gstlal-ugly/python/aggregator.py @@ -97,18 +97,18 @@ def get_data_from_kafka(jobs, routes, kafka_consumer, req_all = False, timeout = bailout_time = now() # FIXME this will block forever if it runs out of events unless # consumer_timeout_ms (int) is set. Should we let it block? - cnt = 0 + maxt = 0 for message in kafka_consumer: if message.topic not in jobs: continue - # FIXME assumes json has only one key value pair - route, packet = message.value.items()[0] - if route not in routes: - continue - timedata[(message.topic, route)].append(float(packet.split()[0])) - datadata[(message.topic, route)].append(float(packet.split()[1])) - cnt += 1 - if timedata[(message.topic, route)][-1] > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout): + for route in routes: + for x in message.value[route].split("\n"): + if x: + t,d = [float(y) for y in x.split()] + timedata[(message.topic, route)].append(t) + datadata[(message.topic, route)].append(d) + maxt = max(maxt, t) + if maxt > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout): break for k in timedata: timedata[k] = numpy.array(timedata[k])