Skip to content
Snippets Groups Projects
Commit edff9c61 authored by chad.hanna's avatar chad.hanna
Browse files

aggregator.py: update to new kafka data packing scheme

parent aa29b94a
No related branches found
No related tags found
No related merge requests found
......@@ -97,18 +97,18 @@ def get_data_from_kafka(jobs, routes, kafka_consumer, req_all = False, timeout =
bailout_time = now()
# FIXME this will block forever if it runs out of events unless
# consumer_timeout_ms (int) is set. Should we let it block?
cnt = 0
maxt = 0
for message in kafka_consumer:
if message.topic not in jobs:
continue
# FIXME assumes json has only one key value pair
route, packet = message.value.items()[0]
if route not in routes:
continue
timedata[(message.topic, route)].append(float(packet.split()[0]))
datadata[(message.topic, route)].append(float(packet.split()[1]))
cnt += 1
if timedata[(message.topic, route)][-1] > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout):
for route in routes:
for x in message.value[route].split("\n"):
if x:
t,d = [float(y) for y in x.split()]
timedata[(message.topic, route)].append(t)
datadata[(message.topic, route)].append(d)
maxt = max(maxt, t)
if maxt > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout):
break
for k in timedata:
timedata[k] = numpy.array(timedata[k])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment