Skip to content
Snippets Groups Projects
Commit 5ad9e7fb authored by Patrick Godwin's avatar Patrick Godwin
Browse files

rename gstlal_feature_listener to gstlal_feature_monitor for consistency, port...

rename gstlal_feature_listener to gstlal_feature_monitor for consistency, port to using new datamon aggregator classes
parent efc42702
No related branches found
No related tags found
No related merge requests found
......@@ -9,5 +9,5 @@ dist_bin_SCRIPTS = \
gstlal_feature_extractor_whitener_check \
gstlal_feature_extractor_template_overlap \
gstlal_feature_hdf5_sink \
gstlal_feature_listener \
gstlal_feature_monitor \
gstlal_feature_synchronizer
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_listener [--options]"
__usage__ = "gstlal_feature_monitor [--options]"
__description__ = "an executable to collect and monitor streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -24,7 +24,7 @@ __author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
# Preamble
#-------------------------------------------------
from collections import deque
from collections import defaultdict, deque
import json
import optparse
import os
......@@ -46,7 +46,7 @@ from gstlal.fxtools import utils
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Listener Options", "General settings for configuring the listener.")
group = optparse.OptionGroup(parser, "Monitor Options", "General settings for configuring the monitor.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
......@@ -55,10 +55,16 @@ def parse_command_line():
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--num-channels", type = "int", help = "Set the full number of channels being processed upstream, used for monitoring purposes.")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the monitor acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
group.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
options, args = parser.parse_args()
......@@ -69,12 +75,12 @@ def parse_command_line():
# Classes
#-------------------------------------------------
class StreamListener(object):
class StreamMonitor(object):
"""
Listens to incoming streaming features, collects metrics and pushes relevant metrics to sqlite.
"""
def __init__(self, logger, options):
logger.info('setting up feature listener...')
logger.info('setting up feature monitor...')
### initialize timing options
self.request_timeout = options.request_timeout
......@@ -84,7 +90,7 @@ class StreamListener(object):
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server,
'group.id': 'listener_%s'%options.tag}
'group.id': 'monitor_%s'%options.tag}
### initialize consumers
self.consumer = Consumer(self.kafka_settings)
......@@ -93,11 +99,6 @@ class StreamListener(object):
### initialize queues
self.feature_queue = deque(maxlen = 60 * self.sample_rate)
### database settings
self.database_name = 'fx'
self.table_name = 'fxmonitor'
self.columns = ['target_snr', 'latency', 'percent_missed']
### other settings
if options.target_channel:
self.target_channel = options.target_channel
......@@ -105,17 +106,27 @@ class StreamListener(object):
self.target_channel = '%s:CAL-DELTAL_EXTERNAL_DQ'%options.instrument
self.num_channels = options.num_channels
### setup/connect to database
logger.info("initializing sqlite database...")
self.sqlite_path = os.path.join(options.rootdir, 'fx.sqlite')
# Check that the database file exists, if not, then initialize it:
if not os.path.isfile(self.sqlite_path):
logger.info("db does not exist, creating one...")
self.conn = io.sqlite.create_client(self.database_name, options.rootdir)
io.sqlite.create_table(self.conn, self.columns, self.table_name)
self.data_type = options.data_type
### keep track of last timestamp processed and saved
self.last_save = None
self.timestamp = None
### set up aggregator
logger.info("setting up monitor with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
)
def fetch_data(self):
"""
......@@ -133,42 +144,51 @@ class StreamListener(object):
"""
add a set of features for a given timestamp to the feature queue
"""
logger.info('processing features for timestamp %f' % timestamp)
self.feature_queue.appendleft((timestamp, data))
self.timestamp = timestamp
def process_queue(self):
"""
process a single buffer, generate metrics and insert row into db
process features and generate metrics from synchronizer on a regular cadence
"""
### remove data with oldest timestamp and process
self.timestamp, features = self.feature_queue.pop()
latency = utils.gps2latency(self.timestamp)
logger.info('processing features for timestamp %f, latency = %.3f s' % (self.timestamp, latency))
### generate metrics
percent_missed = 100 * (float(self.num_channels - len(features.keys())) / self.num_channels)
if features.has_key(self.target_channel):
target_snr = features[self.target_channel][0]['snr']
else:
target_snr = 0.
if self.timestamp:
if not self.last_save or utils.in_new_epoch(self.timestamp, self.last_save, 1):
metrics = defaultdict(list)
while len(self.feature_queue) > 0:
### remove data with oldest timestamp and process
timestamp, features = self.feature_queue.pop()
latency = utils.gps2latency(timestamp)
### generate metrics
metrics['time'].append(timestamp)
metrics['latency'].append(latency)
metrics['percent_missed'].append(100 * (float(self.num_channels - len(features.keys())) / self.num_channels))
if features.has_key(self.target_channel):
metrics['target_time'].append(timestamp)
metrics['target_snr'].append(features[self.target_channel][0]['snr'])
data = {'target_snr': [target_snr], 'latency': [latency], 'percent_missed': [percent_missed]}
### store and aggregate features
for metric in ('latency', 'percent_missed'):
self.agg_sink.store_and_reduce(metric, {'synchronizer': [metrics['time'], metrics[metric]]}, 'data', tags='job', aggregates=self.data_type)
if len(metrics['target_time']) > 0:
self.agg_sink.store_and_reduce(metric, {'synchronizer': [metrics['target_time'], metrics['target_snr']]}, 'data', tags='job', aggregates=self.data_type)
### add to database
io.sqlite.store_timeseries(self.conn, self.columns, [self.timestamp], data, self.table_name)
self.last_save = timestamp
logger.info('processed features up to timestamp %.3f, max latency = %.3f s' % (timestamp, max(metrics['latency'])))
def start(self):
"""
starts ingesting features and monitoring and pushes metrics to sqlite
"""
logger.info('starting feature listener...')
logger.info('starting feature monitor...')
self.is_running = True
while self.is_running:
### ingest incoming features
self.fetch_data()
### push combined features downstream
while self.feature_queue:
self.process_queue()
### store and aggregate generated metrics
self.process_queue()
### repeat with processing cadence
time.sleep(self.processing_cadence)
......@@ -176,20 +196,20 @@ class StreamListener(object):
"""
shut down gracefully
"""
logger.info('shutting down feature listener...')
logger.info('shutting down feature monitor...')
self.conn.close()
class SignalHandler(object):
"""
helper class to shut down the stream listener gracefully before exiting
helper class to shut down the stream monitor gracefully before exiting
"""
def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]):
self.listener = listener
def __init__(self, monitor, signals = [signal.SIGINT, signal.SIGTERM]):
self.monitor = monitor
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
self.listener.stop()
self.monitor.stop()
sys.exit(0)
......@@ -203,17 +223,17 @@ if __name__ == '__main__':
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'feature_listener']),
'-'.join([options.tag, 'feature_monitor']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create summary instance
listener = StreamListener(logger, options=options)
monitor = StreamMonitor(logger, options=options)
# install signal handler
SignalHandler(listener)
SignalHandler(monitor)
# start up listener
listener.start()
# start up monitor
monitor.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment