Skip to content
Snippets Groups Projects
Commit 2b6c1f86 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

update gstlal_feature_aggregator, gstlal_feature_monitor for ligo-scald API changes

parent 537ef476
No related branches found
No related tags found
No related merge requests found
......@@ -112,19 +112,23 @@ class StreamAggregator(object):
### set up aggregator
logger.info("setting up aggregator with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
self.agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
self.agg_sink = io.hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
)
### define measurements to be stored from aggregators
self.agg_sink.register_schema('latency', columns='data', column_key='data', tags='job', tag_key='job')
self.agg_sink.register_schema('snr', columns='data', column_key='data', tags='channel', tag_key='channel')
def fetch_data(self, job_consumer):
"""
requests for a new message from an individual topic,
......@@ -163,11 +167,11 @@ class StreamAggregator(object):
### store and aggregate metrics
metric_data = {job: {'time': metrics['time'], 'fields': {'data': metrics['latency']}} for job, metrics in all_metrics.items()}
self.agg_sink.store_columns('latency', metric_data, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns('latency', metric_data, aggregate=self.data_type)
### store and aggregate features
timeseries_data = {channel: {'time': timeseries['trigger_time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_columns('snr', timeseries_data, 'data', tags='channel', aggregate=self.data_type)
self.agg_sink.store_columns('snr', timeseries_data, aggregate=self.data_type)
try:
max_latency = max(max(metrics['latency']) for metrics in all_metrics.values())
......@@ -242,7 +246,7 @@ if __name__ == '__main__':
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'feature_aggregator']),
'-'.join([options.tag, 'feature_aggregator', options.jobs[0]]),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
......
......@@ -117,14 +117,14 @@ class StreamMonitor(object):
### set up aggregator
logger.info("setting up monitor with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
self.agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
self.agg_sink = io.hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
......@@ -134,6 +134,9 @@ class StreamMonitor(object):
name, _ = options.channel_list.rsplit('.', 1)
self.channels = set(multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys())
### define measurements to be stored
for metric in ('target_snr', 'synchronizer_latency', 'percent_missed'):
self.agg_sink.register_schema(metric, columns='data', column_key='data', tags='job', tag_key='job')
def fetch_data(self):
"""
......@@ -185,10 +188,10 @@ class StreamMonitor(object):
### store and aggregate features
for metric in ('synchronizer_latency', 'percent_missed'):
data = {'time': metrics['time'], 'fields': {'data': metrics[metric]}}
self.agg_sink.store_columns(metric, {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns(metric, {'synchronizer': data}, aggregate=self.data_type)
if len(metrics['target_time']) > 0:
data = {'time': metrics['target_time'], 'fields': {'data': metrics['target_snr']}}
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, aggregate=self.data_type)
self.last_save = timestamp
logger.info('processed features up to timestamp %.3f, max latency = %.3f s, percent missing channels = %.3f' % (timestamp, max(metrics['synchronizer_latency']), max(metrics['percent_missed'])))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment