diff --git a/gstlal-burst/bin/gstlal_feature_aggregator b/gstlal-burst/bin/gstlal_feature_aggregator index 30fe6903a5d28dfb6f450c789773210ac834b32b..aff27803a07a6f4f0f63f3cb813d0eb4e48f4031 100755 --- a/gstlal-burst/bin/gstlal_feature_aggregator +++ b/gstlal-burst/bin/gstlal_feature_aggregator @@ -127,7 +127,7 @@ class StreamAggregator(object): ### define measurements to be stored from aggregators self.agg_sink.register_schema('latency', columns='data', column_key='data', tags='job', tag_key='job') - self.agg_sink.register_schema('snr', columns='data', column_key='data', tags='channel', tag_key='channel') + self.agg_sink.register_schema('snr', columns='data', column_key='data', tags=('channel', 'subsystem'), tag_key='channel') def fetch_data(self, job_consumer): """ @@ -170,7 +170,7 @@ class StreamAggregator(object): self.agg_sink.store_columns('latency', metric_data, aggregate=self.data_type) ### store and aggregate features - timeseries_data = {channel: {'time': timeseries['trigger_time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()} + timeseries_data = {(channel, self._channel_to_subsystem(channel)): {'time': timeseries['trigger_time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()} self.agg_sink.store_columns('snr', timeseries_data, aggregate=self.data_type) try: @@ -222,6 +222,13 @@ class StreamAggregator(object): logger.info('shutting down feature aggregator...') self.is_running = False + @staticmethod + def _channel_to_subsystem(channel): + """ + given a channel, returns the subsystem the channel lives in + """ + return channel.split(':')[1].split('-')[0] + class SignalHandler(object): """ helper class to shut down the stream aggregator gracefully before exiting