Commit e7517778 authored by Patrick Godwin's avatar Patrick Godwin
Browse files change how kafka topics/partitions are arranged, remove data... change how kafka topics/partitions are arranged, remove data decimation for kafka topics
parent 90a2e91d
......@@ -186,7 +186,11 @@ class EyeCandy(object):
if self.kafka_server is not None:
from kafka import KafkaProducer
self.producer = KafkaProducer(bootstrap_servers=[self.kafka_server], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
self.producer = KafkaProducer(
key_serializer=lambda m: json.dumps(m).encode('utf-8'),
value_serializer=lambda m: json.dumps(m).encode('utf-8'),
self.producer = None
......@@ -287,21 +291,10 @@ class EyeCandy(object):
self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t))
self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.)
# decimate the data in the other routes
for route in self.kafka_data.keys():
if route == "coinc" or len(self.kafka_data[route]["data"]) == 0:
if route == "likelihood_history" or route == "latency_history" or "snr_history" in route:
ix = numpy.argmax(self.kafka_data[route]["data"])
self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
if route == "far_history":
ix = numpy.argmin(self.kafka_data[route]["data"])
self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
# Send all of the kafka messages and clear the data
self.producer.send(self.tag, self.kafka_data)
#self.producer.send(self.tag, self.kafka_data)
for route in self.kafka_data.keys():
self.producer.send(route, key=self.tag, value=self.kafka_data[route])
# This line forces the send but is blocking!! not the
# best idea for production running since we value
# latency over getting metric data out
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment