Skip to content
Snippets Groups Projects
Commit 974dfa7e authored by Patrick Godwin's avatar Patrick Godwin
Browse files

add gstlal_feature_aggregator to aggregate incoming features online

parent c9dcd7a6
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@ dist_bin_SCRIPTS = \
gstlal_cs_triggergen \
gstlal_excesspower \
gstlal_excesspower_trigvis \
gstlal_feature_aggregator \
gstlal_feature_extractor \
gstlal_feature_extractor_pipe \
gstlal_ll_feature_extractor_pipe \
......
#!/usr/bin/env python
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_aggregator [--options]"
__description__ = "an executable to aggregate and generate job metrics for streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import defaultdict, deque
import json
from multiprocessing.dummy import Pool as ThreadPool
import optparse
import os
import signal
import sys
import time
import numpy
from confluent_kafka import Consumer, KafkaError
from datamon import aggregator
from datamon import io
from gstlal.fxtools import utils
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Listener Options", "General settings for configuring the listener.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Location where log messages and sqlite database lives")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
group.add_option("--jobs", action="append", help="Specify jobs to process. Can be given multiple times.")
group.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamAggregator(object):
"""
Ingests and aggregates incoming streaming features, collects job metrics.
"""
def __init__(self, logger, options):
logger.info('setting up feature aggregator...')
### initialize timing options
self.request_timeout = options.request_timeout
self.processing_cadence = options.processing_cadence
self.sample_rate = options.sample_rate
self.is_running = False
### kafka settings
self.kafka_settings = {
'bootstrap.servers': options.kafka_server,
'group.id': 'aggregator_%s_%s'%(options.tag, options.jobs[0])
}
### other aggregator options
self.data_type = options.data_type
### initialize consumers
self.jobs = options.jobs
self.consumer_names = ['%s_%s' % (options.input_topic_basename, job) for job in self.jobs]
self.job_consumers = [(job, Consumer(self.kafka_settings)) for job, topic in zip(self.jobs, self.consumer_names)]
for topic, job_consumer in zip(self.consumer_names, self.job_consumers):
job_consumer[1].subscribe([topic])
### initialize 30 second queue for incoming buffers
self.feature_queue = {job: deque(maxlen = 30 * self.sample_rate) for job in self.jobs}
### set up aggregator
logger.info("setting up aggregator with backend: %s..."%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes
)
def fetch_data(self, job_consumer):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
job, consumer = job_consumer
message = consumer.poll(timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
### decode json and parse data
feature_subset = json.loads(message.value())
self.feature_queue[job].appendleft((feature_subset['timestamp'], feature_subset['features']))
def fetch_all_data(self):
"""
requests for a new message from all topics, and add
to the feature queue
"""
pool = ThreadPool(len(self.job_consumers))
result = pool.map_async(self.fetch_data, self.job_consumers)
result.wait()
pool.close()
def process_queue(self):
"""
process and aggregate features on a regular cadence
"""
for job in self.jobs:
num_packets = len(self.feature_queue[job])
### process only if at least 1 second of data in queue
if num_packets >= self.sample_rate:
feature_packets = [self.feature_queue[job].pop() for i in range(num_packets)]
timestamps, channels, all_timeseries = self.packets_to_timeseries(feature_packets)
if timestamps:
### remove data with oldest timestamp and process
latencies = [utils.gps2latency(timestamp) for timestamp in timestamps]
logger.info('processing features for job: %s, gps range: %.3f - %.3f, latency: %.3f s' % (job, timestamps[0], timestamps[-1], max(latencies)))
self.agg_sink.store_and_reduce('latency', {job: [timestamps, latencies]}, 'data', tags='job', aggregates=self.data_type)
### aggregate features
for channel, timeseries in zip(channels, all_timeseries):
self.agg_sink.store_and_reduce('snr', {channel: [timeseries['trigger_time'], timeseries['snr']]}, 'data', tags='channel', aggregates=self.data_type)
def packets_to_timeseries(self, packets):
"""
splits up a series of packets into ordered timeseries, keyed by channel
"""
### process each packet sequentially and split rows by channel
timestamps = []
channel_rows = defaultdict(list)
for timestamp, packet in packets:
timestamps.append(timestamp)
for channel, row in packet.items():
channel_rows[channel].extend(row)
### break up rows into timeseries
timeseries = {}
for channel, rows in channel_rows.items():
timeseries[channel] = {column: [row[column] for row in rows] for column in rows[0].keys()}
return timestamps, timeseries.keys(), timeseries.values()
def start(self):
"""
starts ingestion and aggregation of features
"""
logger.info('starting feature listener...')
self.is_running = True
while self.is_running:
### ingest incoming features
self.fetch_all_data()
### push combined features downstream
self.process_queue()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def stop(self):
"""
shut down gracefully
"""
logger.info('shutting down feature aggregator...')
self.conn.close()
class SignalHandler(object):
"""
helper class to shut down the stream aggregator gracefully before exiting
"""
def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]):
self.listener = listener
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
self.listener.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'feature_aggregator']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create summary instance
aggregator = StreamAggregator(logger, options=options)
# install signal handler
SignalHandler(aggregator)
# start up listener
aggregator.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment