Skip to content
Snippets Groups Projects
Commit e5511709 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_synchronizer: add no_drop option to stop incoming features from...

gstlal_feature_synchronizer: add no_drop option to stop incoming features from getting dropped from a latency timeout, instead features after this stage will just get combined and pushed downstream
parent 054cee63
No related branches found
No related tags found
No related merge requests found
......@@ -56,6 +56,7 @@ def parse_command_line():
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename, i.e. {basename}_%02d. Required.")
parser.add_option("--output-topic-basename", metavar = "string", help = "Sets the output kafka topic name. Required.")
......@@ -81,6 +82,7 @@ class StreamSynchronizer(object):
self.processing_cadence = options.processing_cadence
self.request_timeout = options.request_timeout
self.latency_timeout = options.latency_timeout
self.no_drop = options.no_drop
self.is_running = False
### kafka settings
......@@ -121,7 +123,7 @@ class StreamSynchronizer(object):
feature_subset = json.loads(message.value())
### add to queue if timestamp is within timeout
if feature_subset['timestamp'] >= self.max_timeout():
if self.no_drop or (feature_subset['timestamp'] >= self.max_timeout()):
self.add_to_queue(feature_subset['timestamp'], feature_subset['features'])
def fetch_all_data(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment