Skip to content
Snippets Groups Projects
Commit 0b3c1e3a authored by Patrick Godwin's avatar Patrick Godwin
Browse files

feature_extractor.py + related executable: make kafka an optional library with...

feature_extractor.py + related executable: make kafka an optional library with checks if data format needs kafka imports. auxcache.py: fixed typo in package name
parent bd6e01b7
No related branches found
No related tags found
No related merge requests found
...@@ -128,7 +128,7 @@ A program to extract features from auxiliary channel data in real time or in off ...@@ -128,7 +128,7 @@ A program to extract features from auxiliary channel data in real time or in off
### ###
import math import math
from optparse import OptionParser, OptionGroup import optparse
import os import os
import resource import resource
import socket import socket
...@@ -192,7 +192,7 @@ setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread ...@@ -192,7 +192,7 @@ setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread
def parse_command_line(): def parse_command_line():
parser = OptionParser(usage = '%prog [options]', description = __doc__) parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
# First append datasource and feature extraction common options # First append datasource and feature extraction common options
multichannel_datasource.append_options(parser) multichannel_datasource.append_options(parser)
...@@ -248,10 +248,6 @@ instrument = data_source_info.instrument ...@@ -248,10 +248,6 @@ instrument = data_source_info.instrument
basename = '%s-%s' % (instrument[:1], options.description) basename = '%s-%s' % (instrument[:1], options.description)
waveforms = {} waveforms = {}
# only load kafka library if triggers are transferred via kafka topic
if options.save_format == 'kafka':
from confluent_kafka import Producer
# #
# set up logging # set up logging
# #
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
## @file ## @file
## @package hoftcache ## @package auxcache
# #
# ============================================================================= # =============================================================================
......
...@@ -44,8 +44,6 @@ from gi.repository import GObject, Gst ...@@ -44,8 +44,6 @@ from gi.repository import GObject, Gst
GObject.threads_init() GObject.threads_init()
Gst.init(None) Gst.init(None)
from confluent_kafka import Producer
import lal import lal
from lal import LIGOTimeGPS from lal import LIGOTimeGPS
...@@ -62,6 +60,11 @@ from gstlal import simplehandler ...@@ -62,6 +60,11 @@ from gstlal import simplehandler
from gstlal.fxtools import sngltriggertable from gstlal.fxtools import sngltriggertable
from gstlal.fxtools import utils from gstlal.fxtools import utils
# set up confluent_kafka as an optional library
try:
import confluent_kafka as kafka
except ImportError:
kafka = None
# ============================= # =============================
# #
...@@ -144,11 +147,12 @@ class MultiChannelHandler(simplehandler.Handler): ...@@ -144,11 +147,12 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata.append(self.header) self.fdata.append(self.header)
elif self.save_format == 'kafka': elif self.save_format == 'kafka':
check_kafka()
self.data_transfer = options.data_transfer self.data_transfer = options.data_transfer
self.kafka_partition = options.kafka_partition self.kafka_partition = options.kafka_partition
self.kafka_topic = '_'.join([options.kafka_topic, self.job_id]) self.kafka_topic = '_'.join([options.kafka_topic, self.job_id])
self.kafka_conf = {'bootstrap.servers': options.kafka_server} self.kafka_conf = {'bootstrap.servers': options.kafka_server}
self.producer = Producer(self.kafka_conf) self.producer = kafka.Producer(self.kafka_conf)
elif self.save_format == 'bottle': elif self.save_format == 'bottle':
assert not options.disable_web_service, 'web service is not available to use bottle to transfer features' assert not options.disable_web_service, 'web service is not available to use bottle to transfer features'
...@@ -558,3 +562,10 @@ def append_options(parser): ...@@ -558,3 +562,10 @@ def append_options(parser):
group.add_option("--feature-start-time", type = "int", metavar = "seconds", help = "Set the start time of the segment to output features in GPS seconds. Required unless --data-source=lvshm") group.add_option("--feature-start-time", type = "int", metavar = "seconds", help = "Set the start time of the segment to output features in GPS seconds. Required unless --data-source=lvshm")
group.add_option("--feature-end-time", type = "int", metavar = "seconds", help = "Set the end time of the segment to output features in GPS seconds. Required unless --data-source=lvshm") group.add_option("--feature-end-time", type = "int", metavar = "seconds", help = "Set the end time of the segment to output features in GPS seconds. Required unless --data-source=lvshm")
parser.add_option_group(group) parser.add_option_group(group)
def check_kafka():
"""!
Checks if confluent_kafka was imported correctly.
"""
if kafka is None:
raise ImportError("you're attempting to use kafka to transfer features, but confluent_kafka could not be imported")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment