diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py index b4b9264ea5e254a0d0db1677f022319aa8ca824e..3e1225daab430e52252f33d44b2ebd0d6c0f2da4 100644 --- a/gstlal-inspiral/python/lloidhandler.py +++ b/gstlal-inspiral/python/lloidhandler.py @@ -186,7 +186,11 @@ class EyeCandy(object): if self.kafka_server is not None: from kafka import KafkaProducer - self.producer = KafkaProducer(bootstrap_servers=[self.kafka_server], value_serializer=lambda m: json.dumps(m).encode('utf-8')) + self.producer = KafkaProducer( + bootstrap_servers=[self.kafka_server], + key_serializer=lambda m: json.dumps(m).encode('utf-8'), + value_serializer=lambda m: json.dumps(m).encode('utf-8'), + ) else: self.producer = None @@ -287,21 +291,10 @@ class EyeCandy(object): self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t)) self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.) - # decimate the data in the other routes - for route in self.kafka_data.keys(): - if route == "coinc" or len(self.kafka_data[route]["data"]) == 0: - continue - if route == "likelihood_history" or route == "latency_history" or "snr_history" in route: - ix = numpy.argmax(self.kafka_data[route]["data"]) - self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]] - self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]] - if route == "far_history": - ix = numpy.argmin(self.kafka_data[route]["data"]) - self.kafka_data[route]["time"] = [self.kafka_data[route]["time"][ix]] - self.kafka_data[route]["data"] = [self.kafka_data[route]["data"][ix]] - # Send all of the kafka messages and clear the data - self.producer.send(self.tag, self.kafka_data) + #self.producer.send(self.tag, self.kafka_data) + for route in self.kafka_data.keys(): + self.producer.send(route, key=self.tag, value=self.kafka_data[route]) # This line forces the send but is blocking!! not the # best idea for production running since we value # latency over getting metric data out