From dfe2fc14b0d0a8ed8c78c1b08584695a5cd4605d Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Tue, 27 Nov 2018 11:56:11 -0800
Subject: [PATCH] add gstlal_feature_listener for online feature extraction
 monitoring

---
 gstlal-burst/bin/Makefile.am             |   1 +
 gstlal-burst/bin/gstlal_feature_listener | 233 +++++++++++++++++++++++
 2 files changed, 234 insertions(+)
 create mode 100755 gstlal-burst/bin/gstlal_feature_listener

diff --git a/gstlal-burst/bin/Makefile.am b/gstlal-burst/bin/Makefile.am
index 3906c232cc..9a04e30624 100644
--- a/gstlal-burst/bin/Makefile.am
+++ b/gstlal-burst/bin/Makefile.am
@@ -8,4 +8,5 @@ dist_bin_SCRIPTS = \
 	gstlal_feature_extractor_whitener_check \
 	gstlal_feature_extractor_template_overlap \
 	gstlal_feature_hdf5_sink \
+	gstlal_feature_listener \
 	gstlal_feature_synchronizer
diff --git a/gstlal-burst/bin/gstlal_feature_listener b/gstlal-burst/bin/gstlal_feature_listener
new file mode 100755
index 0000000000..b771b8bc85
--- /dev/null
+++ b/gstlal-burst/bin/gstlal_feature_listener
@@ -0,0 +1,233 @@
+#!/usr/bin/env python
+
+# Copyright (C) 2018  Patrick Godwin
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+__usage__ = "gstlal_feature_listener [--options]"
+__description__ = "an executable to collect and monitor streaming features"
+__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
+
+#-------------------------------------------------
+#                  Preamble
+#-------------------------------------------------
+
+from collections import deque
+import json
+import optparse
+import os
+import signal
+import sqlite3
+import sys
+import time
+
+from confluent_kafka import Consumer, KafkaError
+
+from gstlal.fxtools import utils
+
+#-------------------------------------------------
+#                  Functions
+#-------------------------------------------------
+
+def parse_command_line():
+
+    parser = optparse.OptionParser(usage=__usage__, description=__description__)
+    group = optparse.OptionGroup(parser, "Listener Options", "General settings for configuring the listener.")
+    group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
+    group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
+    group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
+    group.add_option("--target-channel", metavar = "string", default = "CAL-DELTAL_EXTERNAL_DQ", help = "Sets the target channel to view. Default = CAL-DELTAL_EXTERNAL_DQ")
+    group.add_option("--rootdir", metavar = "path", default = ".", help = "Location where log messages and sqlite database lives")
+    group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
+    group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
+    group.add_option("--num-channels", type = "int", help = "Set the full number of channels being processed upstream, used for monitoring purposes.")
+    group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
+    group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
+    group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
+    group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
+    parser.add_option_group(group)
+
+    options, args = parser.parse_args()
+
+    return options, args
+
+#-------------------------------------------------
+#                   Classes
+#-------------------------------------------------
+
+class StreamListener(object):
+    """
+    Listens to incoming streaming features, collects metrics and pushes relevant metrics to sqlite.
+    """
+    def __init__(self, logger, options):
+        logger.info('setting up feature listener...')
+
+        ### initialize timing options
+        self.request_timeout = options.request_timeout
+        self.processing_cadence = options.processing_cadence
+        self.sample_rate = options.sample_rate
+        self.is_running = False
+
+        ### kafka settings
+        self.kafka_settings = {'bootstrap.servers': options.kafka_server,
+                               'group.id': 'listener_%s'%options.tag}
+
+        ### initialize consumers
+        self.consumer = Consumer(self.kafka_settings)
+        self.consumer.subscribe([options.input_topic_basename])
+
+        ### initialize queues
+        self.feature_queue = deque(maxlen = 60 * self.sample_rate)
+
+        ### other settings
+        self.target_channel = '%s:%s' % (options.instrument, options.target_channel)
+        self.num_channels = options.num_channels
+
+        ### initialize DB
+        logger.info("initializing sqlite database...")
+        self.sqlite_path = os.path.join(options.rootdir, 'fx.sqlite')
+
+        # Check that the database file exists, if not, then initialize it:
+        if not os.path.isfile(self.sqlite_path):
+            logger.info("db does not exist, creating one...")
+
+        # connect to DB
+        self.conn = sqlite3.connect(self.sqlite_path)
+        self.initialize_db(self.sqlite_path)
+
+    def fetch_data(self):
+        """
+        requests for a new message from an individual topic,
+        and add to the feature queue
+        """
+        message = self.consumer.poll(timeout=self.request_timeout)
+
+        ### only add to queue if no errors in receiving data
+        if message and not message.error():
+            features = json.loads(message.value())
+            self.add_to_queue(features['timestamp'], features['features'])
+
+    def add_to_queue(self, timestamp, data):
+        """
+        add a set of features for a given timestamp to the feature queue
+        """
+        logger.info('processing features for timestamp %f' % timestamp)
+        self.feature_queue.appendleft((timestamp, data))
+
+    def process_queue(self):
+        """
+        process a single buffer, generate metrics and insert row into db
+        """
+        ### remove data with oldest timestamp and process
+        self.timestamp, features = self.feature_queue.pop()
+        latency = utils.gps2latency(self.timestamp) 
+        logger.info('processing features for timestamp %f, latency = %.3f s' % (self.timestamp, latency))
+
+        ### format row
+        percent_missed = 100. * ((self.num_channels - len(features.keys())) / self.num_channels)
+        if features.has_key(self.target_channel):
+            target_snr = features[self.target_channel][0]['snr']
+        else:
+            target_snr = 0.
+
+        data = (self.timestamp, latency, percent_missed, target_snr)
+
+        ### insert row into DB
+        c = self.conn.cursor()
+        sql = ''' INSERT INTO fxmonitor(timestamp,latency,percent_missed,target_snr) VALUES(?,?,?,?) '''
+        c.execute(sql,data)
+        self.conn.commit()
+
+    def start(self):
+        """
+        starts ingesting features and monitoring and pushes metrics to sqlite
+        """
+        logger.info('starting feature listener...')
+        self.is_running = True
+        while self.is_running:
+            ### ingest incoming features
+            self.fetch_data()
+            ### push combined features downstream
+            while self.feature_queue:
+                self.process_queue()
+            ### repeat with processing cadence
+            time.sleep(self.processing_cadence)
+
+    def stop(self):
+        """
+        shut down gracefully
+        """
+        logger.info('shutting down feature listener...')
+        self.conn.close()
+
+    def initialize_db(self, sqlite_path):
+        table_name = 'fxmonitor'
+    
+        columns = ['timestamp','latency', 'percent_missed', 'target_snr']
+        column_type = ['REAL','REAL','REAL','REAL']
+        column_def = [0.,0., 0., 0.]
+    
+        c = self.conn.cursor()
+    
+        fx_table = """ CREATE TABLE IF NOT EXISTS fxmonitor (
+                                        timestamp real PRIMARY KEY,
+                                        latency real,
+                                        percent_missed real,
+                                        target_snr real
+                                    ); """
+    
+        c.execute(fx_table)
+    
+        ### Committing changes and closing the connection to the database file
+        self.conn.commit()
+
+class SignalHandler(object):
+    """
+    helper class to shut down the stream listener gracefully before exiting
+    """
+    def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]):
+        self.listener = listener
+        for sig in signals:
+            signal.signal(sig, self)
+
+    def __call__(self, signum, frame):
+        self.listener.stop()
+        sys.exit(0)
+
+
+#-------------------------------------------------
+#                    Main
+#-------------------------------------------------
+
+if __name__ == '__main__':
+    # parse arguments
+    options, args = parse_command_line()
+
+    ### set up logging
+    logger = utils.get_logger(
+        '-'.join([options.tag, 'feature_listener']),
+        log_level=options.log_level,
+        rootdir=options.rootdir,
+        verbose=options.verbose
+    )
+
+    # create summary instance
+    listener = StreamListener(logger, options=options)
+
+    # install signal handler
+    SignalHandler(listener)
+
+    # start up listener
+    listener.start()
-- 
GitLab