From e551170985cd4cb780f907f77cf950b2e662f68b Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Mon, 29 Oct 2018 11:57:50 -0700
Subject: [PATCH] gstlal_feature_synchronizer: add no_drop option to stop
 incoming features from getting dropped from a latency timeout, instead
 features after this stage will just get combined and pushed downstream

---
 gstlal-burst/bin/gstlal_feature_synchronizer | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/gstlal-burst/bin/gstlal_feature_synchronizer b/gstlal-burst/bin/gstlal_feature_synchronizer
index 2363129a7e..caef73d054 100755
--- a/gstlal-burst/bin/gstlal_feature_synchronizer
+++ b/gstlal-burst/bin/gstlal_feature_synchronizer
@@ -56,6 +56,7 @@ def parse_command_line():
     parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
     parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
     parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
+    parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
     parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
     parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename, i.e. {basename}_%02d. Required.")
     parser.add_option("--output-topic-basename", metavar = "string", help = "Sets the output kafka topic name. Required.")
@@ -81,6 +82,7 @@ class StreamSynchronizer(object):
         self.processing_cadence = options.processing_cadence
         self.request_timeout = options.request_timeout
         self.latency_timeout = options.latency_timeout
+        self.no_drop = options.no_drop
         self.is_running = False
 
         ### kafka settings
@@ -121,7 +123,7 @@ class StreamSynchronizer(object):
             feature_subset = json.loads(message.value())
 
             ### add to queue if timestamp is within timeout
-            if feature_subset['timestamp'] >= self.max_timeout():
+            if self.no_drop or (feature_subset['timestamp'] >= self.max_timeout()):
                 self.add_to_queue(feature_subset['timestamp'], feature_subset['features'])
 
     def fetch_all_data(self):
-- 
GitLab