Skip to content
Snippets Groups Projects
Commit 609b8950 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

feature synchronizer + hdf5 sink: clean up imports, remove idq dependency that...

feature synchronizer + hdf5 sink: clean up imports, remove idq dependency that was not necessarily needed, fix kafka topic subscribing in synchronizer to look for job ids
parent 3565f713
No related branches found
No related tags found
No related merge requests found
...@@ -38,13 +38,10 @@ from confluent_kafka import Consumer, KafkaError ...@@ -38,13 +38,10 @@ from confluent_kafka import Consumer, KafkaError
import h5py import h5py
import numpy import numpy
from gstlal import multichannel_datasource
from gstlal import idq_utils
from gstlal import aggregator from gstlal import aggregator
from idq import io from gstlal.fxtools import multichannel_datasource
from idq import logs from gstlal.fxtools import utils
from idq import names
#------------------------------------------------- #-------------------------------------------------
# Functions # Functions
...@@ -251,8 +248,8 @@ if __name__ == '__main__': ...@@ -251,8 +248,8 @@ if __name__ == '__main__':
options, args = parse_command_line() options, args = parse_command_line()
### set up logging ### set up logging
logger = logs.get_logger( logger = utils.get_logger(
names.tag2logname(options.tag, 'hdf5_sink'), '-'.join([options.tag, 'hdf5_sink']),
log_level=options.log_level, log_level=options.log_level,
rootdir=options.rootdir, rootdir=options.rootdir,
verbose=options.verbose verbose=options.verbose
......
...@@ -39,9 +39,6 @@ from lal import LIGOTimeGPS ...@@ -39,9 +39,6 @@ from lal import LIGOTimeGPS
from confluent_kafka import Producer, Consumer, KafkaError from confluent_kafka import Producer, Consumer, KafkaError
from idq import logs
from idq import names
#------------------------------------------------- #-------------------------------------------------
# Functions # Functions
#------------------------------------------------- #-------------------------------------------------
...@@ -88,7 +85,7 @@ class StreamSynchronizer(object): ...@@ -88,7 +85,7 @@ class StreamSynchronizer(object):
self.num_topics = options.num_topics self.num_topics = options.num_topics
### initialize consumers ### initialize consumers
self.consumer_names = ['%s_%s' % (options.input_topic_basename, str(i).zfill(2)) for i in range(1, self.num_topics + 1)] self.consumer_names = ['%s_%s' % (options.input_topic_basename, str(i).zfill(4)) for i in range(1, self.num_topics + 1)]
# FIXME: hacky way of introducing group id, should be a settable option # FIXME: hacky way of introducing group id, should be a settable option
consumer_kafka_settings = self.kafka_settings consumer_kafka_settings = self.kafka_settings
consumer_kafka_settings['group.id'] = 'group_1' consumer_kafka_settings['group.id'] = 'group_1'
...@@ -240,8 +237,8 @@ if __name__ == '__main__': ...@@ -240,8 +237,8 @@ if __name__ == '__main__':
options, args = parse_command_line() options, args = parse_command_line()
### set up logging ### set up logging
logger = logs.get_logger( logger = utils.get_logger(
names.tag2logname(options.tag, 'synchronizer'), '-'.join([options.tag, 'synchronizer']),
log_level=options.log_level, log_level=options.log_level,
rootdir=options.rootdir, rootdir=options.rootdir,
verbose=options.verbose verbose=options.verbose
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment