Skip to content
Snippets Groups Projects
Commit 87c1683a authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg: added ability to transfer ETG data through Kafka

parent 620e7577
No related branches found
No related tags found
No related merge requests found
......@@ -44,6 +44,7 @@ GObject.threads_init()
Gst.init(None)
import lal
from confluent_kafka import Producer
import numpy
import pandas
......@@ -213,6 +214,13 @@ class MultiChannelHandler(simplehandler.Handler):
bottle.route("/psds.xml")(self.web_get_psd_xml)
bottle.route("/etg_subset")(self.web_get_etg_data)
# set up kafka related properties
if options.use_kafka:
self.etg_parition = options.etg_partition
self.kafka_topic = options.kafka_topic
self.kafka_conf = {'bootstrap.servers': options.kafka_server}
self.producer = Producer(self.kafka_conf)
super(MultiChannelHandler, self).__init__(*args, **kwargs)
def do_on_message(self, bus, message):
......@@ -254,7 +262,10 @@ class MultiChannelHandler(simplehandler.Handler):
if self.etg_event_time is None:
self.etg_event_time = buftime
if self.etg_event_time < buftime:
self.etg_data.append({self.etg_event_time: list(self.etg_event)})
if options.use_kafka:
self.producer.produce(timestamp = self.etg_event_time, topic = self.kafka_topic, value = json.dumps(list(self.etg_event)))
else:
self.etg_data.append({self.etg_event_time: list(self.etg_event)})
self.etg_event.clear()
self.etg_event_time = buftime
......@@ -588,6 +599,10 @@ def parse_command_line():
parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--triggers-from-dataframe", action = "store_true", default = False, help = "If set, will output iDQ-compatible triggers to disk straight from dataframe once every cadence")
parser.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.")
parser.add_option("--etg-partition", metavar = "string", help = "If using Kafka, sets the partition that this ETG is assigned to.")
parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this ETG publishes feature vector subsets to.")
parser.add_option("--kafka-server", metavar = "string", help = "If using Kafka, sets the server url that the kafka topic is hosted on.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.2, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.2.")
parser.add_option("-q", "--qhigh", type = "float", default = 20, help = "Q high value for half sine-gaussian waveforms. Default = 20.")
parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment