From 87c1683a73b9511fa38fb3714189a9c6f1378faa Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Wed, 25 Oct 2017 05:56:52 -0700
Subject: [PATCH] gstlal_etg: added ability to transfer ETG data through Kafka

---
 gstlal-ugly/bin/gstlal_etg | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/gstlal-ugly/bin/gstlal_etg b/gstlal-ugly/bin/gstlal_etg
index fc1a769e3f..6ad3fbd1e9 100755
--- a/gstlal-ugly/bin/gstlal_etg
+++ b/gstlal-ugly/bin/gstlal_etg
@@ -44,6 +44,7 @@ GObject.threads_init()
 Gst.init(None)
 import lal
 
+from confluent_kafka import Producer
 import numpy
 import pandas
 
@@ -213,6 +214,13 @@ class MultiChannelHandler(simplehandler.Handler):
 			bottle.route("/psds.xml")(self.web_get_psd_xml)
 			bottle.route("/etg_subset")(self.web_get_etg_data)
 
+		# set up kafka related properties
+		if options.use_kafka:
+			self.etg_parition = options.etg_partition
+			self.kafka_topic = options.kafka_topic
+			self.kafka_conf = {'bootstrap.servers': options.kafka_server}
+			self.producer = Producer(self.kafka_conf)
+
 		super(MultiChannelHandler, self).__init__(*args, **kwargs)
 
 	def do_on_message(self, bus, message):
@@ -254,7 +262,10 @@ class MultiChannelHandler(simplehandler.Handler):
 			if self.etg_event_time is None:
 				self.etg_event_time = buftime
 			if self.etg_event_time < buftime:
-				self.etg_data.append({self.etg_event_time: list(self.etg_event)})
+				if options.use_kafka:
+					self.producer.produce(timestamp = self.etg_event_time, topic = self.kafka_topic, value = json.dumps(list(self.etg_event)))
+				else:
+					self.etg_data.append({self.etg_event_time: list(self.etg_event)})
 				self.etg_event.clear()
 				self.etg_event_time = buftime
 
@@ -588,6 +599,10 @@ def parse_command_line():
 	parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
 	parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
 	parser.add_option("--triggers-from-dataframe", action = "store_true", default = False, help = "If set, will output iDQ-compatible triggers to disk straight from dataframe once every cadence")
+	parser.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.")
+	parser.add_option("--etg-partition", metavar = "string", help = "If using Kafka, sets the partition that this ETG is assigned to.")
+	parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this ETG publishes feature vector subsets to.")
+	parser.add_option("--kafka-server", metavar = "string", help = "If using Kafka, sets the server url that the kafka topic is hosted on.")
 	parser.add_option("-m", "--mismatch", type = "float", default = 0.2, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.2.")
 	parser.add_option("-q", "--qhigh", type = "float", default = 20, help = "Q high value for half sine-gaussian waveforms. Default = 20.")
 	parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.")
-- 
GitLab