From 5ad9e7fb1e3772c9e9d6db3c3e3e5e088b2b0ad2 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Tue, 12 Feb 2019 19:13:43 -0800
Subject: [PATCH] rename gstlal_feature_listener to gstlal_feature_monitor for
 consistency, port to using new datamon aggregator classes

---
 gstlal-burst/bin/Makefile.am                  |   2 +-
 ...eature_listener => gstlal_feature_monitor} | 126 ++++++++++--------
 2 files changed, 74 insertions(+), 54 deletions(-)
 rename gstlal-burst/bin/{gstlal_feature_listener => gstlal_feature_monitor} (57%)

diff --git a/gstlal-burst/bin/Makefile.am b/gstlal-burst/bin/Makefile.am
index 7c1788ecf3..22c1a4015a 100644
--- a/gstlal-burst/bin/Makefile.am
+++ b/gstlal-burst/bin/Makefile.am
@@ -9,5 +9,5 @@ dist_bin_SCRIPTS = \
 	gstlal_feature_extractor_whitener_check \
 	gstlal_feature_extractor_template_overlap \
 	gstlal_feature_hdf5_sink \
-	gstlal_feature_listener \
+	gstlal_feature_monitor \
 	gstlal_feature_synchronizer
diff --git a/gstlal-burst/bin/gstlal_feature_listener b/gstlal-burst/bin/gstlal_feature_monitor
similarity index 57%
rename from gstlal-burst/bin/gstlal_feature_listener
rename to gstlal-burst/bin/gstlal_feature_monitor
index 05f68cf228..54fdde034b 100755
--- a/gstlal-burst/bin/gstlal_feature_listener
+++ b/gstlal-burst/bin/gstlal_feature_monitor
@@ -16,7 +16,7 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
-__usage__ = "gstlal_feature_listener [--options]"
+__usage__ = "gstlal_feature_monitor [--options]"
 __description__ = "an executable to collect and monitor streaming features"
 __author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
 
@@ -24,7 +24,7 @@ __author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
 #                  Preamble
 #-------------------------------------------------
 
-from collections import deque
+from collections import defaultdict, deque
 import json
 import optparse
 import os
@@ -46,7 +46,7 @@ from gstlal.fxtools import utils
 def parse_command_line():
 
     parser = optparse.OptionParser(usage=__usage__, description=__description__)
-    group = optparse.OptionGroup(parser, "Listener Options", "General settings for configuring the listener.")
+    group = optparse.OptionGroup(parser, "Monitor Options", "General settings for configuring the monitor.")
     group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
     group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
     group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
@@ -55,10 +55,16 @@ def parse_command_line():
     group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
     group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
     group.add_option("--num-channels", type = "int", help = "Set the full number of channels being processed upstream, used for monitoring purposes.")
-    group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
+    group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the monitor acquires and processes data. Default = 0.1 seconds.")
     group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
     group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
     group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
+    group.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
+    group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
+    group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
+    group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
+    group.add_option("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
+    group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
     parser.add_option_group(group)
 
     options, args = parser.parse_args()
@@ -69,12 +75,12 @@ def parse_command_line():
 #                   Classes
 #-------------------------------------------------
 
-class StreamListener(object):
+class StreamMonitor(object):
     """
     Listens to incoming streaming features, collects metrics and pushes relevant metrics to sqlite.
     """
     def __init__(self, logger, options):
-        logger.info('setting up feature listener...')
+        logger.info('setting up feature monitor...')
 
         ### initialize timing options
         self.request_timeout = options.request_timeout
@@ -84,7 +90,7 @@ class StreamListener(object):
 
         ### kafka settings
         self.kafka_settings = {'bootstrap.servers': options.kafka_server,
-                               'group.id': 'listener_%s'%options.tag}
+                               'group.id': 'monitor_%s'%options.tag}
 
         ### initialize consumers
         self.consumer = Consumer(self.kafka_settings)
@@ -93,11 +99,6 @@ class StreamListener(object):
         ### initialize queues
         self.feature_queue = deque(maxlen = 60 * self.sample_rate)
 
-        ### database settings
-        self.database_name = 'fx'
-        self.table_name = 'fxmonitor'
-        self.columns = ['target_snr', 'latency', 'percent_missed']
-
         ### other settings
         if options.target_channel:
             self.target_channel = options.target_channel
@@ -105,17 +106,27 @@ class StreamListener(object):
             self.target_channel = '%s:CAL-DELTAL_EXTERNAL_DQ'%options.instrument
         self.num_channels = options.num_channels
 
-
-        ### setup/connect to database
-        logger.info("initializing sqlite database...")
-        self.sqlite_path = os.path.join(options.rootdir, 'fx.sqlite')
-
-        # Check that the database file exists, if not, then initialize it:
-        if not os.path.isfile(self.sqlite_path):
-            logger.info("db does not exist, creating one...")
-
-        self.conn = io.sqlite.create_client(self.database_name, options.rootdir)
-        io.sqlite.create_table(self.conn, self.columns, self.table_name)
+        self.data_type = options.data_type
+
+        ### keep track of last timestamp processed and saved
+        self.last_save = None
+        self.timestamp = None
+
+        ### set up aggregator 
+        logger.info("setting up monitor with backend: %s"%options.data_backend)
+        if options.data_backend == 'influx':
+            self.agg_sink = io.influx.InfluxDBAggregator(
+                hostname=options.influx_hostname,
+                port=options.influx_port,
+                db=options.influx_database_name,
+                reduce_across_tags=False,
+            )
+        else: ### hdf5 data backend
+            self.agg_sink = io.hdf5.HDF5Aggregator(
+                rootdir=options.rootdir,
+                num_processes=options.num_processes,
+                reduce_across_tags=False,
+            )
 
     def fetch_data(self):
         """
@@ -133,42 +144,51 @@ class StreamListener(object):
         """
         add a set of features for a given timestamp to the feature queue
         """
-        logger.info('processing features for timestamp %f' % timestamp)
         self.feature_queue.appendleft((timestamp, data))
+        self.timestamp = timestamp
 
     def process_queue(self):
         """
-        process a single buffer, generate metrics and insert row into db
+        process features and generate metrics from synchronizer on a regular cadence
         """
-        ### remove data with oldest timestamp and process
-        self.timestamp, features = self.feature_queue.pop()
-        latency = utils.gps2latency(self.timestamp) 
-        logger.info('processing features for timestamp %f, latency = %.3f s' % (self.timestamp, latency))
-
-        ### generate metrics
-        percent_missed = 100 * (float(self.num_channels - len(features.keys())) / self.num_channels)
-        if features.has_key(self.target_channel):
-            target_snr = features[self.target_channel][0]['snr']
-        else:
-            target_snr = 0.
+        if self.timestamp:
+            if not self.last_save or utils.in_new_epoch(self.timestamp, self.last_save, 1):
+
+                metrics = defaultdict(list)
+                while len(self.feature_queue) > 0:
+                    ### remove data with oldest timestamp and process
+                    timestamp, features = self.feature_queue.pop()
+                    latency = utils.gps2latency(timestamp) 
+
+                    ### generate metrics
+                    metrics['time'].append(timestamp)
+                    metrics['latency'].append(latency)
+                    metrics['percent_missed'].append(100 * (float(self.num_channels - len(features.keys())) / self.num_channels))
+
+                    if features.has_key(self.target_channel):
+                        metrics['target_time'].append(timestamp)
+                        metrics['target_snr'].append(features[self.target_channel][0]['snr'])
 
-        data = {'target_snr': [target_snr], 'latency': [latency], 'percent_missed': [percent_missed]}
+                ### store and aggregate features
+                for metric in ('latency', 'percent_missed'):
+                    self.agg_sink.store_and_reduce(metric, {'synchronizer': [metrics['time'], metrics[metric]]}, 'data', tags='job', aggregates=self.data_type)
+                if len(metrics['target_time']) > 0:
+                    self.agg_sink.store_and_reduce(metric, {'synchronizer': [metrics['target_time'], metrics['target_snr']]}, 'data', tags='job', aggregates=self.data_type)
 
-        ### add to database
-        io.sqlite.store_timeseries(self.conn, self.columns, [self.timestamp], data, self.table_name)
+                self.last_save = timestamp
+                logger.info('processed features up to timestamp %.3f, max latency = %.3f s' % (timestamp, max(metrics['latency'])))
 
     def start(self):
         """
         starts ingesting features and monitoring and pushes metrics to sqlite
         """
-        logger.info('starting feature listener...')
+        logger.info('starting feature monitor...')
         self.is_running = True
         while self.is_running:
             ### ingest incoming features
             self.fetch_data()
-            ### push combined features downstream
-            while self.feature_queue:
-                self.process_queue()
+            ### store and aggregate generated metrics
+            self.process_queue()
             ### repeat with processing cadence
             time.sleep(self.processing_cadence)
 
@@ -176,20 +196,20 @@ class StreamListener(object):
         """
         shut down gracefully
         """
-        logger.info('shutting down feature listener...')
+        logger.info('shutting down feature monitor...')
         self.conn.close()
 
 class SignalHandler(object):
     """
-    helper class to shut down the stream listener gracefully before exiting
+    helper class to shut down the stream monitor gracefully before exiting
     """
-    def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]):
-        self.listener = listener
+    def __init__(self, monitor, signals = [signal.SIGINT, signal.SIGTERM]):
+        self.monitor = monitor
         for sig in signals:
             signal.signal(sig, self)
 
     def __call__(self, signum, frame):
-        self.listener.stop()
+        self.monitor.stop()
         sys.exit(0)
 
 
@@ -203,17 +223,17 @@ if __name__ == '__main__':
 
     ### set up logging
     logger = utils.get_logger(
-        '-'.join([options.tag, 'feature_listener']),
+        '-'.join([options.tag, 'feature_monitor']),
         log_level=options.log_level,
         rootdir=options.rootdir,
         verbose=options.verbose
     )
 
     # create summary instance
-    listener = StreamListener(logger, options=options)
+    monitor = StreamMonitor(logger, options=options)
 
     # install signal handler
-    SignalHandler(listener)
+    SignalHandler(monitor)
 
-    # start up listener
-    listener.start()
+    # start up monitor
+    monitor.start()
-- 
GitLab