Skip to content
Snippets Groups Projects
Commit efc42702 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_aggregator: modify process_queue() and packets_to_timeseries()...

gstlal_feature_aggregator: modify process_queue() and packets_to_timeseries() to process more channels in parallel, fix typos
parent f549b9de
No related branches found
No related tags found
No related merge requests found
...@@ -97,6 +97,7 @@ class StreamAggregator(object): ...@@ -97,6 +97,7 @@ class StreamAggregator(object):
### other aggregator options ### other aggregator options
self.data_type = options.data_type self.data_type = options.data_type
self.last_save = aggregator.now()
### initialize consumers ### initialize consumers
self.jobs = options.jobs self.jobs = options.jobs
...@@ -109,17 +110,19 @@ class StreamAggregator(object): ...@@ -109,17 +110,19 @@ class StreamAggregator(object):
self.feature_queue = {job: deque(maxlen = 30 * self.sample_rate) for job in self.jobs} self.feature_queue = {job: deque(maxlen = 30 * self.sample_rate) for job in self.jobs}
### set up aggregator ### set up aggregator
logger.info("setting up aggregator with backend: %s..."%options.data_backend) logger.info("setting up aggregator with backend: %s"%options.data_backend)
if options.data_backend == 'influx': if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator( self.agg_sink = io.influx.InfluxDBAggregator(
hostname=options.influx_hostname, hostname=options.influx_hostname,
port=options.influx_port, port=options.influx_port,
db=options.influx_database_name db=options.influx_database_name,
reduce_across_tags=False,
) )
else: ### hdf5 data backend else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator( self.agg_sink = io.hdf5.HDF5Aggregator(
rootdir=options.rootdir, rootdir=options.rootdir,
num_processes=options.num_processes num_processes=options.num_processes,
reduce_across_tags=False,
) )
def fetch_data(self, job_consumer): def fetch_data(self, job_consumer):
...@@ -151,34 +154,40 @@ class StreamAggregator(object): ...@@ -151,34 +154,40 @@ class StreamAggregator(object):
""" """
process and aggregate features on a regular cadence process and aggregate features on a regular cadence
""" """
for job in self.jobs: if utils.in_new_epoch(aggregator.now(), self.last_save, 1):
num_packets = len(self.feature_queue[job]) self.last_save = aggregator.now()
### process only if at least 1 second of data in queue ### format incoming packets into metrics and timeseries
if num_packets >= self.sample_rate: feature_packets = [(job, self.feature_queue[job].pop()) for job in self.jobs for i in range(len(self.feature_queue[job]))]
feature_packets = [self.feature_queue[job].pop() for i in range(num_packets)] all_timeseries, all_metrics = self.packets_to_timeseries(feature_packets)
timestamps, channels, all_timeseries = self.packets_to_timeseries(feature_packets)
### store and aggregate metrics
if timestamps: metric_data = {job: [metrics['time'], metrics['latency']] for job, metrics in all_metrics.items()}
### remove data with oldest timestamp and process self.agg_sink.store_and_reduce('latency', metric_data, 'data', tags='job', aggregates=self.data_type)
latencies = [utils.gps2latency(timestamp) for timestamp in timestamps]
logger.info('processing features for job: %s, gps range: %.3f - %.3f, latency: %.3f s' % (job, timestamps[0], timestamps[-1], max(latencies))) ### store and aggregate features
self.agg_sink.store_and_reduce('latency', {job: [timestamps, latencies]}, 'data', tags='job', aggregates=self.data_type) timeseries_data = {channel: [timeseries['trigger_time'], timeseries['snr']] for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_and_reduce('snr', timeseries_data, 'data', tags='channel', aggregates=self.data_type)
### aggregate features
for channel, timeseries in zip(channels, all_timeseries): try:
self.agg_sink.store_and_reduce('snr', {channel: [timeseries['trigger_time'], timeseries['snr']]}, 'data', tags='channel', aggregates=self.data_type) max_latency = max(max(metrics['latency']) for metrics in all_metrics.values())
logger.info('processed features at time %d, highest latency is %.3f' % (self.last_save, max_latency))
except:
logger.info('no features to process at time %d' % self.last_save)
def packets_to_timeseries(self, packets): def packets_to_timeseries(self, packets):
""" """
splits up a series of packets into ordered timeseries, keyed by channel splits up a series of packets into ordered timeseries, keyed by channel
""" """
metrics = defaultdict(lambda: {'time': [], 'latency': []})
### process each packet sequentially and split rows by channel ### process each packet sequentially and split rows by channel
timestamps = []
channel_rows = defaultdict(list) channel_rows = defaultdict(list)
for timestamp, packet in packets: for job, packet in packets:
timestamps.append(timestamp) timestamp, features = packet
for channel, row in packet.items(): metrics[job]['time'].append(timestamp)
metrics[job]['latency'].append(utils.gps2latency(timestamp))
for channel, row in features.items():
channel_rows[channel].extend(row) channel_rows[channel].extend(row)
### break up rows into timeseries ### break up rows into timeseries
...@@ -186,13 +195,13 @@ class StreamAggregator(object): ...@@ -186,13 +195,13 @@ class StreamAggregator(object):
for channel, rows in channel_rows.items(): for channel, rows in channel_rows.items():
timeseries[channel] = {column: [row[column] for row in rows] for column in rows[0].keys()} timeseries[channel] = {column: [row[column] for row in rows] for column in rows[0].keys()}
return timestamps, timeseries.keys(), timeseries.values() return timeseries, metrics
def start(self): def start(self):
""" """
starts ingestion and aggregation of features starts ingestion and aggregation of features
""" """
logger.info('starting feature listener...') logger.info('starting feature aggregator...')
self.is_running = True self.is_running = True
while self.is_running: while self.is_running:
### ingest incoming features ### ingest incoming features
...@@ -207,19 +216,18 @@ class StreamAggregator(object): ...@@ -207,19 +216,18 @@ class StreamAggregator(object):
shut down gracefully shut down gracefully
""" """
logger.info('shutting down feature aggregator...') logger.info('shutting down feature aggregator...')
self.conn.close()
class SignalHandler(object): class SignalHandler(object):
""" """
helper class to shut down the stream aggregator gracefully before exiting helper class to shut down the stream aggregator gracefully before exiting
""" """
def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]): def __init__(self, aggregator_sink, signals = [signal.SIGINT, signal.SIGTERM]):
self.listener = listener self.aggregator_sink = aggregator_sink
for sig in signals: for sig in signals:
signal.signal(sig, self) signal.signal(sig, self)
def __call__(self, signum, frame): def __call__(self, signum, frame):
self.listener.stop() self.aggregator_sink.stop()
sys.exit(0) sys.exit(0)
...@@ -240,10 +248,10 @@ if __name__ == '__main__': ...@@ -240,10 +248,10 @@ if __name__ == '__main__':
) )
# create summary instance # create summary instance
aggregator = StreamAggregator(logger, options=options) aggregator_sink = StreamAggregator(logger, options=options)
# install signal handler # install signal handler
SignalHandler(aggregator) SignalHandler(aggregator_sink)
# start up listener # start up listener
aggregator.start() aggregator_sink.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment