diff --git a/gstlal-burst/bin/gstlal_feature_aggregator b/gstlal-burst/bin/gstlal_feature_aggregator index 890a5d0d9e39f88e42f2c19f0f7566739fb02fca..0798bf6937998c14c7bd3773c107343b52617ad6 100755 --- a/gstlal-burst/bin/gstlal_feature_aggregator +++ b/gstlal-burst/bin/gstlal_feature_aggregator @@ -97,6 +97,7 @@ class StreamAggregator(object): ### other aggregator options self.data_type = options.data_type + self.last_save = aggregator.now() ### initialize consumers self.jobs = options.jobs @@ -109,17 +110,19 @@ class StreamAggregator(object): self.feature_queue = {job: deque(maxlen = 30 * self.sample_rate) for job in self.jobs} ### set up aggregator - logger.info("setting up aggregator with backend: %s..."%options.data_backend) + logger.info("setting up aggregator with backend: %s"%options.data_backend) if options.data_backend == 'influx': self.agg_sink = io.influx.InfluxDBAggregator( hostname=options.influx_hostname, port=options.influx_port, - db=options.influx_database_name + db=options.influx_database_name, + reduce_across_tags=False, ) else: ### hdf5 data backend self.agg_sink = io.hdf5.HDF5Aggregator( rootdir=options.rootdir, - num_processes=options.num_processes + num_processes=options.num_processes, + reduce_across_tags=False, ) def fetch_data(self, job_consumer): @@ -151,34 +154,40 @@ class StreamAggregator(object): """ process and aggregate features on a regular cadence """ - for job in self.jobs: - num_packets = len(self.feature_queue[job]) - - ### process only if at least 1 second of data in queue - if num_packets >= self.sample_rate: - feature_packets = [self.feature_queue[job].pop() for i in range(num_packets)] - timestamps, channels, all_timeseries = self.packets_to_timeseries(feature_packets) - - if timestamps: - ### remove data with oldest timestamp and process - latencies = [utils.gps2latency(timestamp) for timestamp in timestamps] - logger.info('processing features for job: %s, gps range: %.3f - %.3f, latency: %.3f s' % (job, timestamps[0], timestamps[-1], max(latencies))) - self.agg_sink.store_and_reduce('latency', {job: [timestamps, latencies]}, 'data', tags='job', aggregates=self.data_type) - - ### aggregate features - for channel, timeseries in zip(channels, all_timeseries): - self.agg_sink.store_and_reduce('snr', {channel: [timeseries['trigger_time'], timeseries['snr']]}, 'data', tags='channel', aggregates=self.data_type) + if utils.in_new_epoch(aggregator.now(), self.last_save, 1): + self.last_save = aggregator.now() + + ### format incoming packets into metrics and timeseries + feature_packets = [(job, self.feature_queue[job].pop()) for job in self.jobs for i in range(len(self.feature_queue[job]))] + all_timeseries, all_metrics = self.packets_to_timeseries(feature_packets) + + ### store and aggregate metrics + metric_data = {job: [metrics['time'], metrics['latency']] for job, metrics in all_metrics.items()} + self.agg_sink.store_and_reduce('latency', metric_data, 'data', tags='job', aggregates=self.data_type) + + ### store and aggregate features + timeseries_data = {channel: [timeseries['trigger_time'], timeseries['snr']] for channel, timeseries in all_timeseries.items()} + self.agg_sink.store_and_reduce('snr', timeseries_data, 'data', tags='channel', aggregates=self.data_type) + + try: + max_latency = max(max(metrics['latency']) for metrics in all_metrics.values()) + logger.info('processed features at time %d, highest latency is %.3f' % (self.last_save, max_latency)) + except: + logger.info('no features to process at time %d' % self.last_save) def packets_to_timeseries(self, packets): """ splits up a series of packets into ordered timeseries, keyed by channel """ + metrics = defaultdict(lambda: {'time': [], 'latency': []}) + ### process each packet sequentially and split rows by channel - timestamps = [] channel_rows = defaultdict(list) - for timestamp, packet in packets: - timestamps.append(timestamp) - for channel, row in packet.items(): + for job, packet in packets: + timestamp, features = packet + metrics[job]['time'].append(timestamp) + metrics[job]['latency'].append(utils.gps2latency(timestamp)) + for channel, row in features.items(): channel_rows[channel].extend(row) ### break up rows into timeseries @@ -186,13 +195,13 @@ class StreamAggregator(object): for channel, rows in channel_rows.items(): timeseries[channel] = {column: [row[column] for row in rows] for column in rows[0].keys()} - return timestamps, timeseries.keys(), timeseries.values() + return timeseries, metrics def start(self): """ starts ingestion and aggregation of features """ - logger.info('starting feature listener...') + logger.info('starting feature aggregator...') self.is_running = True while self.is_running: ### ingest incoming features @@ -207,19 +216,18 @@ class StreamAggregator(object): shut down gracefully """ logger.info('shutting down feature aggregator...') - self.conn.close() class SignalHandler(object): """ helper class to shut down the stream aggregator gracefully before exiting """ - def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]): - self.listener = listener + def __init__(self, aggregator_sink, signals = [signal.SIGINT, signal.SIGTERM]): + self.aggregator_sink = aggregator_sink for sig in signals: signal.signal(sig, self) def __call__(self, signum, frame): - self.listener.stop() + self.aggregator_sink.stop() sys.exit(0) @@ -240,10 +248,10 @@ if __name__ == '__main__': ) # create summary instance - aggregator = StreamAggregator(logger, options=options) + aggregator_sink = StreamAggregator(logger, options=options) # install signal handler - SignalHandler(aggregator) + SignalHandler(aggregator_sink) # start up listener - aggregator.start() + aggregator_sink.start()