Skip to content
Snippets Groups Projects
Commit 99b0cfb8 authored by chad.hanna's avatar chad.hanna
Browse files

lloidhandler: reduce some of the data going to kafka

parent efa319a1
No related branches found
No related tags found
No related merge requests found
......@@ -260,6 +260,7 @@ class EyeCandy(object):
t = inspiral.now()
if self.time_since_last_state is None:
self.time_since_last_state = t
# NOTE only dump to kafka every 1 seconds
if self.producer is not None and (t - self.time_since_last_state) >= 1:
self.time_since_last_state = t
for ii, column in enumerate(["time", "data"]):
......@@ -284,9 +285,25 @@ class EyeCandy(object):
self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t))
self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.)
# Send and flush all of the kafka messages and clear the data
# decimate the data in the other routes
for route in self.kafka_data.keys():
if route == "coinc" or len(self.kafka_data[route]["data"]) == 0:
continue
if route == "likelihood_history" or route == "latency_history" or "snr_history" in route:
ix = numpy.argmax(self.kafka_data[route]["data"])
self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
if route == "far_history":
ix = numpy.argmin(self.kafka_data[route]["data"])
self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]]
self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]]
# Send all of the kafka messages and clear the data
self.producer.send(self.tag, self.kafka_data)
self.producer.flush()
# This line forces the send but is blocking!! not the
# best idea for production running since we value
# latency over getting metric data out
#self.producer.flush()
for route in self.kafka_data.keys():
self.kafka_data[route] = {'time': [], 'data': []}
self.kafka_data["coinc"] = []
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment