Commit 517934ba authored by Patrick Godwin's avatar Patrick Godwin

update gstlal_feature_aggregator, gstlal_feature_monitor to deal with ligo-scald API change

parent eca2d16c
Pipeline #51540 failed with stages
in 7 minutes and 18 seconds
......@@ -64,7 +64,7 @@ def parse_command_line():
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
group.add_option("--data-type", metavar = "string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default: max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
......@@ -162,12 +162,12 @@ class StreamAggregator(object):
all_timeseries, all_metrics = self.packets_to_timeseries(feature_packets)
### store and aggregate metrics
metric_data = {job: [metrics['time'], metrics['latency']] for job, metrics in all_metrics.items()}
self.agg_sink.store_and_reduce('latency', metric_data, 'data', tags='job', aggregates=self.data_type)
metric_data = {job: {'time': metrics['time'], 'fields': {'data': metrics['latency']}} for job, metrics in all_metrics.items()}
self.agg_sink.store_columns('latency', metric_data, 'data', tags='job', aggregate=self.data_type)
### store and aggregate features
timeseries_data = {channel: [timeseries['trigger_time'], timeseries['snr']] for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_and_reduce('snr', timeseries_data, 'data', tags='channel', aggregates=self.data_type)
timeseries_data = {channel: {'time': timeseries['trigger_time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_columns('snr', timeseries_data, 'data', tags='channel', aggregate=self.data_type)
try:
max_latency = max(max(metrics['latency']) for metrics in all_metrics.values())
......
......@@ -65,7 +65,7 @@ def parse_command_line():
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
group.add_option("--data-type", metavar="string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default = max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
......@@ -175,7 +175,7 @@ class StreamMonitor(object):
### generate metrics
metrics['time'].append(timestamp)
metrics['latency'].append(latency)
metrics['synchronizer_latency'].append(latency)
metrics['percent_missed'].append(100 * (float(self.num_channels - len(features.keys())) / self.num_channels))
if features.has_key(self.target_channel):
......@@ -183,13 +183,15 @@ class StreamMonitor(object):
metrics['target_snr'].append(features[self.target_channel][0]['snr'])
### store and aggregate features
for metric in ('latency', 'percent_missed'):
self.agg_sink.store_and_reduce(metric, {'synchronizer': [metrics['time'], metrics[metric]]}, 'data', tags='job', aggregates=self.data_type)
for metric in ('synchronizer_latency', 'percent_missed'):
data = {'time': metrics['time'], 'fields': {'data': metrics[metric]}}
self.agg_sink.store_columns(metric, {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
if len(metrics['target_time']) > 0:
self.agg_sink.store_and_reduce('target_snr', {'synchronizer': [metrics['target_time'], metrics['target_snr']]}, 'data', tags='job', aggregates=self.data_type)
data = {'time': metrics['target_time'], 'fields': {'data': metrics['target_snr']}}
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
self.last_save = timestamp
logger.info('processed features up to timestamp %.3f, max latency = %.3f s, percent missing channels = %.3f' % (timestamp, max(metrics['latency']), max(metrics['percent_missed'])))
logger.info('processed features up to timestamp %.3f, max latency = %.3f s, percent missing channels = %.3f' % (timestamp, max(metrics['synchronizer_latency']), max(metrics['percent_missed'])))
def start(self):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment