From 974dfa7edbda443d843a4f76e2fa8a7ce5d9cc3c Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Tue, 12 Feb 2019 09:01:18 -0800
Subject: [PATCH] add gstlal_feature_aggregator to aggregate incoming features
 online

---
 gstlal-burst/bin/Makefile.am               |   1 +
 gstlal-burst/bin/gstlal_feature_aggregator | 249 +++++++++++++++++++++
 2 files changed, 250 insertions(+)
 create mode 100755 gstlal-burst/bin/gstlal_feature_aggregator

diff --git a/gstlal-burst/bin/Makefile.am b/gstlal-burst/bin/Makefile.am
index 9a04e30624..7c1788ecf3 100644
--- a/gstlal-burst/bin/Makefile.am
+++ b/gstlal-burst/bin/Makefile.am
@@ -2,6 +2,7 @@ dist_bin_SCRIPTS = \
 	gstlal_cs_triggergen \
 	gstlal_excesspower \
 	gstlal_excesspower_trigvis \
+	gstlal_feature_aggregator \
 	gstlal_feature_extractor \
 	gstlal_feature_extractor_pipe \
 	gstlal_ll_feature_extractor_pipe \
diff --git a/gstlal-burst/bin/gstlal_feature_aggregator b/gstlal-burst/bin/gstlal_feature_aggregator
new file mode 100755
index 0000000000..890a5d0d9e
--- /dev/null
+++ b/gstlal-burst/bin/gstlal_feature_aggregator
@@ -0,0 +1,249 @@
+#!/usr/bin/env python
+
+# Copyright (C) 2018  Patrick Godwin
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+__usage__ = "gstlal_feature_aggregator [--options]"
+__description__ = "an executable to aggregate and generate job metrics for streaming features"
+__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
+
+#-------------------------------------------------
+#                  Preamble
+#-------------------------------------------------
+
+from collections import defaultdict, deque
+import json
+from multiprocessing.dummy import Pool as ThreadPool
+import optparse
+import os
+import signal
+import sys
+import time
+
+import numpy
+
+from confluent_kafka import Consumer, KafkaError
+
+from datamon import aggregator
+from datamon import io
+
+from gstlal.fxtools import utils
+
+#-------------------------------------------------
+#                  Functions
+#-------------------------------------------------
+
+def parse_command_line():
+
+    parser = optparse.OptionParser(usage=__usage__, description=__description__)
+    group = optparse.OptionGroup(parser, "Listener Options", "General settings for configuring the listener.")
+    group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
+    group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
+    group.add_option("--rootdir", metavar = "path", default = ".", help = "Location where log messages and sqlite database lives")
+    group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
+    group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
+    group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
+    group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
+    group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
+    group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
+    group.add_option("--jobs", action="append", help="Specify jobs to process. Can be given multiple times.")
+    group.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
+    group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
+    group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
+    group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
+    group.add_option("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
+    group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
+    parser.add_option_group(group)
+
+    options, args = parser.parse_args()
+
+    return options, args
+
+#-------------------------------------------------
+#                   Classes
+#-------------------------------------------------
+
+class StreamAggregator(object):
+    """
+    Ingests and aggregates incoming streaming features, collects job metrics.
+    """
+    def __init__(self, logger, options):
+        logger.info('setting up feature aggregator...')
+
+        ### initialize timing options
+        self.request_timeout = options.request_timeout
+        self.processing_cadence = options.processing_cadence
+        self.sample_rate = options.sample_rate
+        self.is_running = False
+
+        ### kafka settings
+        self.kafka_settings = {
+            'bootstrap.servers': options.kafka_server,
+            'group.id': 'aggregator_%s_%s'%(options.tag, options.jobs[0])
+        }
+
+        ### other aggregator options
+        self.data_type = options.data_type
+
+        ### initialize consumers
+        self.jobs = options.jobs
+        self.consumer_names = ['%s_%s' % (options.input_topic_basename, job) for job in self.jobs]
+        self.job_consumers = [(job, Consumer(self.kafka_settings)) for job, topic in zip(self.jobs, self.consumer_names)]
+        for topic, job_consumer in zip(self.consumer_names, self.job_consumers):
+            job_consumer[1].subscribe([topic])
+
+        ### initialize 30 second queue for incoming buffers
+        self.feature_queue = {job: deque(maxlen = 30 * self.sample_rate) for job in self.jobs}
+
+        ### set up aggregator 
+        logger.info("setting up aggregator with backend: %s..."%options.data_backend)
+        if options.data_backend == 'influx':
+                self.agg_sink = io.influx.InfluxDBAggregator(
+                    hostname=options.influx_hostname,
+                    port=options.influx_port,
+                    db=options.influx_database_name
+                )
+        else: ### hdf5 data backend
+                self.agg_sink = io.hdf5.HDF5Aggregator(
+                    rootdir=options.rootdir,
+                    num_processes=options.num_processes
+                )
+
+    def fetch_data(self, job_consumer):
+        """
+        requests for a new message from an individual topic,
+        and add to the feature queue
+        """
+        job, consumer = job_consumer
+        message = consumer.poll(timeout=self.request_timeout)
+
+        ### only add to queue if no errors in receiving data
+        if message and not message.error():
+
+            ### decode json and parse data
+            feature_subset = json.loads(message.value())
+            self.feature_queue[job].appendleft((feature_subset['timestamp'], feature_subset['features']))
+
+    def fetch_all_data(self):
+        """
+        requests for a new message from all topics, and add
+        to the feature queue
+        """
+        pool = ThreadPool(len(self.job_consumers))
+        result = pool.map_async(self.fetch_data, self.job_consumers)
+        result.wait()
+        pool.close()
+
+    def process_queue(self):
+        """
+        process and aggregate features on a regular cadence
+        """
+        for job in self.jobs:
+            num_packets = len(self.feature_queue[job])
+
+            ### process only if at least 1 second of data in queue
+            if num_packets >= self.sample_rate:
+                feature_packets = [self.feature_queue[job].pop() for i in range(num_packets)]
+                timestamps, channels, all_timeseries = self.packets_to_timeseries(feature_packets)
+    
+                if timestamps:
+                    ### remove data with oldest timestamp and process
+                    latencies = [utils.gps2latency(timestamp) for timestamp in timestamps]
+                    logger.info('processing features for job: %s, gps range: %.3f - %.3f, latency: %.3f s' % (job, timestamps[0], timestamps[-1], max(latencies)))
+                    self.agg_sink.store_and_reduce('latency', {job: [timestamps, latencies]}, 'data', tags='job', aggregates=self.data_type)
+    
+                    ### aggregate features
+                    for channel, timeseries in zip(channels, all_timeseries):
+                        self.agg_sink.store_and_reduce('snr', {channel: [timeseries['trigger_time'], timeseries['snr']]}, 'data', tags='channel', aggregates=self.data_type)
+
+    def packets_to_timeseries(self, packets):
+        """
+        splits up a series of packets into ordered timeseries, keyed by channel
+        """
+        ### process each packet sequentially and split rows by channel
+        timestamps = []
+        channel_rows = defaultdict(list)
+        for timestamp, packet in packets:
+            timestamps.append(timestamp)
+            for channel, row in packet.items():
+                channel_rows[channel].extend(row) 
+
+        ### break up rows into timeseries
+        timeseries = {}
+        for channel, rows in channel_rows.items():
+             timeseries[channel] = {column: [row[column] for row in rows] for column in rows[0].keys()}
+
+        return timestamps, timeseries.keys(), timeseries.values()
+
+    def start(self):
+        """
+        starts ingestion and aggregation of features
+        """
+        logger.info('starting feature listener...')
+        self.is_running = True
+        while self.is_running:
+            ### ingest incoming features
+            self.fetch_all_data()
+            ### push combined features downstream
+            self.process_queue()
+            ### repeat with processing cadence
+            time.sleep(self.processing_cadence)
+
+    def stop(self):
+        """
+        shut down gracefully
+        """
+        logger.info('shutting down feature aggregator...')
+        self.conn.close()
+
+class SignalHandler(object):
+    """
+    helper class to shut down the stream aggregator gracefully before exiting
+    """
+    def __init__(self, listener, signals = [signal.SIGINT, signal.SIGTERM]):
+        self.listener = listener
+        for sig in signals:
+            signal.signal(sig, self)
+
+    def __call__(self, signum, frame):
+        self.listener.stop()
+        sys.exit(0)
+
+
+#-------------------------------------------------
+#                    Main
+#-------------------------------------------------
+
+if __name__ == '__main__':
+    # parse arguments
+    options, args = parse_command_line()
+
+    ### set up logging
+    logger = utils.get_logger(
+        '-'.join([options.tag, 'feature_aggregator']),
+        log_level=options.log_level,
+        rootdir=options.rootdir,
+        verbose=options.verbose
+    )
+
+    # create summary instance
+    aggregator = StreamAggregator(logger, options=options)
+
+    # install signal handler
+    SignalHandler(aggregator)
+
+    # start up listener
+    aggregator.start()
-- 
GitLab