Commit f6554291 authored by Patrick Godwin's avatar Patrick Godwin change how kafka topics/partitions are arranged, remove data... change how kafka topics/partitions are arranged, remove data decimation for kafka topics
parent d14774e5
......@@ -185,7 +185,11 @@ class EyeCandy(object):
if self.kafka_server is not None:
from kafka import KafkaProducer
self.producer = KafkaProducer(bootstrap_servers=[self.kafka_server], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
self.producer = KafkaProducer(
key_serializer=lambda m: json.dumps(m).encode('utf-8'),
value_serializer=lambda m: json.dumps(m).encode('utf-8'),
self.producer = None
......@@ -286,21 +290,10 @@ class EyeCandy(object):
self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t))
self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.)
# decimate the data in the other routes
for route in self.kafka_data.keys():
if route == "coinc" or len(self.kafka_data[route]["data"]) == 0:
if route == "likelihood_history" or route == "latency_history" or "snr_history" in route:
ix = numpy.argmax(self.kafka_data[route]["data"])
self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
if route == "far_history":
ix = numpy.argmin(self.kafka_data[route]["data"])
self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
# Send all of the kafka messages and clear the data
self.producer.send(self.tag, self.kafka_data)
#self.producer.send(self.tag, self.kafka_data)
for route in self.kafka_data.keys():
self.producer.send(route, key=self.tag, value=self.kafka_data[route])
# This line forces the send but is blocking!! not the
# best idea for production running since we value
# latency over getting metric data out
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment