Skip to content
Snippets Groups Projects
Commit dfe2fc14 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

add gstlal_feature_listener for online feature extraction monitoring

parent fb38b02b
No related branches found
No related tags found
No related merge requests found
......@@ -8,4 +8,5 @@ dist_bin_SCRIPTS = \
gstlal_feature_extractor_whitener_check \
gstlal_feature_extractor_template_overlap \
gstlal_feature_hdf5_sink \
gstlal_feature_listener \
gstlal_feature_synchronizer
#!/usr/bin/env python
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_listener [--options]"
__description__ = "an executable to collect and monitor streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import deque
import json
import optparse
import os
import signal
import sqlite3
import sys
import time
from confluent_kafka import Consumer, KafkaError
from gstlal.fxtools import utils
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Listener Options", "General settings for configuring the listener.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--target-channel", metavar = "string", default = "CAL-DELTAL_EXTERNAL_DQ", help = "Sets the target channel to view. Default = CAL-DELTAL_EXTERNAL_DQ")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Location where log messages and sqlite database lives")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--num-channels", type = "int", help = "Set the full number of channels being processed upstream, used for monitoring purposes.")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamListener(object):
"""
Listens to incoming streaming features, collects metrics and pushes relevant metrics to sqlite.
"""
def __init__(self, logger, options):
logger.info('setting up feature listener...')
### initialize timing options
self.request_timeout = options.request_timeout
self.processing_cadence = options.processing_cadence
self.sample_rate = options.sample_rate
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server,
'group.id': 'listener_%s'%options.tag}
### initialize consumers
self.consumer = Consumer(self.kafka_settings)
self.consumer.subscribe([options.input_topic_basename])
### initialize queues
self.feature_queue = deque(maxlen = 60 * self.sample_rate)
### other settings
self.target_channel = '%s:%s' % (options.instrument, options.target_channel)
self.num_channels = options.num_channels
### initialize DB
logger.info("initializing sqlite database...")
self.sqlite_path = os.path.join(options.rootdir, 'fx.sqlite')
# Check that the database file exists, if not, then initialize it:
if not os.path.isfile(self.sqlite_path):
logger.info("db does not exist, creating one...")
# connect to DB
self.conn = sqlite3.connect(self.sqlite_path)
self.initialize_db(self.sqlite_path)
def fetch_data(self):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
message = self.consumer.poll(timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
features = json.loads(message.value())
self.add_to_queue(features['timestamp'], features['features'])
def add_to_queue(self, timestamp, data):
"""
add a set of features for a given timestamp to the feature queue
"""
logger.info('processing features for timestamp %f' % timestamp)
self.feature_queue.appendleft((timestamp, data))
def process_queue(self):
"""
process a single buffer, generate metrics and insert row into db
"""
### remove data with oldest timestamp and process
self.timestamp, features = self.feature_queue.pop()
latency = utils.gps2latency(self.timestamp)
logger.info('processing features for timestamp %f, latency = %.3f s' % (self.timestamp, latency))
### format row
percent_missed = 100. * ((self.num_channels - len(features.keys())) / self.num_channels)
if features.has_key(self.target_channel):
target_snr = features[self.target_channel][0]['snr']
else:
target_snr = 0.
data = (self.timestamp, latency, percent_missed, target_snr)
### insert row into DB
c = self.conn.cursor()
sql = ''' INSERT INTO fxmonitor(timestamp,latency,percent_missed,target_snr) VALUES(?,?,?,?) '''
c.execute(sql,data)
self.conn.commit()
def start(self):
"""
starts ingesting features and monitoring and pushes metrics to sqlite
"""
logger.info('starting feature listener...')
self.is_running = True
while self.is_running:
### ingest incoming features
self.fetch_data()
### push combined features downstream
while self.feature_queue:
self.process_queue()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def stop(self):
"""
shut down gracefully
"""
logger.info('shutting down feature listener...')
self.conn.close()
def initialize_db(self, sqlite_path):
table_name = 'fxmonitor'
columns = ['timestamp','latency', 'percent_missed', 'target_snr']
column_type = ['REAL','REAL','REAL','REAL']
column_def = [0.,0., 0., 0.]
c = self.conn.cursor()
fx_table = """ CREATE TABLE IF NOT EXISTS fxmonitor (
timestamp real PRIMARY KEY,
latency real,
percent_missed real,
target_snr real
); """
c.execute(fx_table)
### Committing changes and closing the connection to the database file
self.conn.commit()
class SignalHandler(object):
"""
helper class to shut down the stream listener gracefully before exiting
"""
def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]):
self.listener = listener
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
self.listener.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'feature_listener']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create summary instance
listener = StreamListener(logger, options=options)
# install signal handler
SignalHandler(listener)
# start up listener
listener.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment