Skip to content
Snippets Groups Projects
Commit 09db9429 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_synchronizer + gstlal_ll_feature_extractor_pipe: fix issue...

gstlal_feature_synchronizer + gstlal_ll_feature_extractor_pipe: fix issue where sample rate of incoming features was not being taken into account, expose option to online DAG generation
parent d52631a9
No related branches found
No related tags found
No related merge requests found
Pipeline #37546 passed with warnings
......@@ -56,6 +56,7 @@ def parse_command_line():
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename, i.e. {basename}_%02d. Required.")
......@@ -82,6 +83,7 @@ class StreamSynchronizer(object):
self.processing_cadence = options.processing_cadence
self.request_timeout = options.request_timeout
self.latency_timeout = options.latency_timeout
self.sample_rate = options.sample_rate
self.no_drop = options.no_drop
self.is_running = False
......@@ -104,8 +106,8 @@ class StreamSynchronizer(object):
### initialize queues
self.last_timestamp = 0
# 1 minute queue for incoming buffers
self.feature_queue = PriorityQueue(maxsize = 60 * self.num_topics)
# 30 second queue for incoming buffers
self.feature_queue = PriorityQueue(maxsize = 30 * self.sample_rate * self.num_topics)
# 5 minute queue for outgoing buffers
self.feature_buffer = deque(maxlen = 300)
......
......@@ -327,6 +327,7 @@ if options.save_format == 'kafka':
synchronizer_options = {
"latency-timeout": options.latency_timeout,
"sample-rate": options.sample_rate,
"input-topic-basename": options.kafka_topic,
"output-topic-basename": '_'.join(['synchronizer', options.tag])
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment