From 0b3c1e3ab802f78d06ead908e8206f6e9b3ba311 Mon Sep 17 00:00:00 2001 From: Patrick Godwin <patrick.godwin@ligo.org> Date: Fri, 13 Jul 2018 07:00:06 -0700 Subject: [PATCH] feature_extractor.py + related executable: make kafka an optional library with checks if data format needs kafka imports. auxcache.py: fixed typo in package name --- gstlal-burst/bin/gstlal_feature_extractor | 8 ++------ gstlal-burst/python/fxtools/auxcache.py | 2 +- .../python/fxtools/feature_extractor.py | 17 ++++++++++++++--- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/gstlal-burst/bin/gstlal_feature_extractor b/gstlal-burst/bin/gstlal_feature_extractor index 3582b71fdb..18aa582a34 100755 --- a/gstlal-burst/bin/gstlal_feature_extractor +++ b/gstlal-burst/bin/gstlal_feature_extractor @@ -128,7 +128,7 @@ A program to extract features from auxiliary channel data in real time or in off ### import math -from optparse import OptionParser, OptionGroup +import optparse import os import resource import socket @@ -192,7 +192,7 @@ setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread def parse_command_line(): - parser = OptionParser(usage = '%prog [options]', description = __doc__) + parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__) # First append datasource and feature extraction common options multichannel_datasource.append_options(parser) @@ -248,10 +248,6 @@ instrument = data_source_info.instrument basename = '%s-%s' % (instrument[:1], options.description) waveforms = {} -# only load kafka library if triggers are transferred via kafka topic -if options.save_format == 'kafka': - from confluent_kafka import Producer - # # set up logging # diff --git a/gstlal-burst/python/fxtools/auxcache.py b/gstlal-burst/python/fxtools/auxcache.py index c93bf71924..061ee74258 100644 --- a/gstlal-burst/python/fxtools/auxcache.py +++ b/gstlal-burst/python/fxtools/auxcache.py @@ -16,7 +16,7 @@ ## @file -## @package hoftcache +## @package auxcache # # ============================================================================= diff --git a/gstlal-burst/python/fxtools/feature_extractor.py b/gstlal-burst/python/fxtools/feature_extractor.py index 4293d773f0..5de061ad45 100644 --- a/gstlal-burst/python/fxtools/feature_extractor.py +++ b/gstlal-burst/python/fxtools/feature_extractor.py @@ -44,8 +44,6 @@ from gi.repository import GObject, Gst GObject.threads_init() Gst.init(None) -from confluent_kafka import Producer - import lal from lal import LIGOTimeGPS @@ -62,6 +60,11 @@ from gstlal import simplehandler from gstlal.fxtools import sngltriggertable from gstlal.fxtools import utils +# set up confluent_kafka as an optional library +try: + import confluent_kafka as kafka +except ImportError: + kafka = None # ============================= # @@ -144,11 +147,12 @@ class MultiChannelHandler(simplehandler.Handler): self.fdata.append(self.header) elif self.save_format == 'kafka': + check_kafka() self.data_transfer = options.data_transfer self.kafka_partition = options.kafka_partition self.kafka_topic = '_'.join([options.kafka_topic, self.job_id]) self.kafka_conf = {'bootstrap.servers': options.kafka_server} - self.producer = Producer(self.kafka_conf) + self.producer = kafka.Producer(self.kafka_conf) elif self.save_format == 'bottle': assert not options.disable_web_service, 'web service is not available to use bottle to transfer features' @@ -558,3 +562,10 @@ def append_options(parser): group.add_option("--feature-start-time", type = "int", metavar = "seconds", help = "Set the start time of the segment to output features in GPS seconds. Required unless --data-source=lvshm") group.add_option("--feature-end-time", type = "int", metavar = "seconds", help = "Set the end time of the segment to output features in GPS seconds. Required unless --data-source=lvshm") parser.add_option_group(group) + +def check_kafka(): + """! + Checks if confluent_kafka was imported correctly. + """ + if kafka is None: + raise ImportError("you're attempting to use kafka to transfer features, but confluent_kafka could not be imported") -- GitLab