Skip to content
Snippets Groups Projects
Commit de983ae3 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_monitor: read in channel list to list missing channels in buffers

parent b8d9815c
No related branches found
No related tags found
No related merge requests found
......@@ -37,6 +37,7 @@ from confluent_kafka import Consumer, KafkaError
from ligo.scald import io
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import utils
#-------------------------------------------------
......@@ -55,6 +56,7 @@ def parse_command_line():
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--num-channels", type = "int", help = "Set the full number of channels being processed upstream, used for monitoring purposes.")
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the monitor acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
......@@ -128,6 +130,11 @@ class StreamMonitor(object):
reduce_across_tags=False,
)
### determine channels to be processed
name, _ = options.channel_list.rsplit('.', 1)
self.channels = set(multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys())
def fetch_data(self):
"""
requests for a new message from an individual topic,
......@@ -160,6 +167,12 @@ class StreamMonitor(object):
timestamp, features = self.feature_queue.pop()
latency = utils.gps2latency(timestamp)
### check for missing channels
these_channels = set(features.keys())
missing_channels = self.channels - these_channels
if missing_channels:
logger.info('channels missing @ timestamp=%.3f: %s' % (timestamp, repr(list(missing_channels))))
### generate metrics
metrics['time'].append(timestamp)
metrics['latency'].append(latency)
......@@ -176,7 +189,7 @@ class StreamMonitor(object):
self.agg_sink.store_and_reduce('target_snr', {'synchronizer': [metrics['target_time'], metrics['target_snr']]}, 'data', tags='job', aggregates=self.data_type)
self.last_save = timestamp
logger.info('processed features up to timestamp %.3f, max latency = %.3f s' % (timestamp, max(metrics['latency'])))
logger.info('processed features up to timestamp %.3f, max latency = %.3f s, percent missing channels = %.3f' % (timestamp, max(metrics['latency']), max(metrics['percent_missed'])))
def start(self):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment