From db057ee1b29b4af5e207f77d9605ecd855f73dfd Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Thu, 28 Jun 2018 12:12:50 -0700
Subject: [PATCH] add gstlal_feature_synchronizer + gstlal_feature_hdf5_sink to
 gstlal-burst for saving and synchronization of features from extractor

---
 gstlal-burst/bin/Makefile.am                 |   4 +-
 gstlal-burst/bin/gstlal_feature_hdf5_sink    | 268 +++++++++++++++++++
 gstlal-burst/bin/gstlal_feature_synchronizer | 257 ++++++++++++++++++
 3 files changed, 528 insertions(+), 1 deletion(-)
 create mode 100755 gstlal-burst/bin/gstlal_feature_hdf5_sink
 create mode 100755 gstlal-burst/bin/gstlal_feature_synchronizer

diff --git a/gstlal-burst/bin/Makefile.am b/gstlal-burst/bin/Makefile.am
index 58cd60b325..b394205d4e 100644
--- a/gstlal-burst/bin/Makefile.am
+++ b/gstlal-burst/bin/Makefile.am
@@ -5,4 +5,6 @@ dist_bin_SCRIPTS = \
 	gstlal_feature_extractor_pipe \
 	gstlal_feature_extractor_pipe_online \
 	gstlal_feature_extractor_whitener_check \
-	gstlal_feature_extractor_template_overlap
+	gstlal_feature_extractor_template_overlap \
+	gstlal_feature_hdf5_sink \
+	gstlal_feature_synchronizer
diff --git a/gstlal-burst/bin/gstlal_feature_hdf5_sink b/gstlal-burst/bin/gstlal_feature_hdf5_sink
new file mode 100755
index 0000000000..5d365358d3
--- /dev/null
+++ b/gstlal-burst/bin/gstlal_feature_hdf5_sink
@@ -0,0 +1,268 @@
+#!/usr/bin/env python
+
+# Copyright (C) 2018  Patrick Godwin
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+__usage__ = "gstlal_feature_hdf5_sink [--options]"
+__description__ = "an executable to dump streaming data to disk via hdf5"
+__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
+
+#-------------------------------------------------
+#                  Preamble
+#-------------------------------------------------
+
+import itertools
+import json
+import os
+import signal
+import sys
+import time
+import shutil
+from collections import deque
+from optparse import OptionParser
+
+from confluent_kafka import Consumer, KafkaError
+import h5py
+import numpy
+
+from gstlal import multichannel_datasource
+from gstlal import idq_utils
+from gstlal import aggregator
+
+from idq import io
+from idq import logs
+from idq import names
+
+#-------------------------------------------------
+#                  Functions
+#-------------------------------------------------
+
+def parse_command_line():
+
+    parser = OptionParser(usage=__usage__, description=__description__)
+    parser.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
+    parser.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
+    parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
+    parser.add_option("--basename", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Sets the basename for files written to disk. Default = GSTLAL_IDQ_TRIGGERS")
+    parser.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
+    parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
+    parser.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
+    parser.add_option("--write-cadence", type = "int", default = 100, help = "Rate at which the feature data is written to disk. Default = 100 seconds.")
+    parser.add_option("--persist-cadence", type = "int", default = 10000, help = "Rate at which new hdf5 files are written to disk. Default = 10000 seconds.")
+    parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
+    parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
+    parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
+    parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
+
+    options, args = parser.parse_args()
+
+    return options, args
+
+#-------------------------------------------------
+#                   Classes
+#-------------------------------------------------
+
+class HDF5StreamSink(object):
+    """
+    Handles the processing of incoming streaming features, saving datasets to disk in hdf5 format.
+    """
+    def __init__(self, logger, options):
+        logger.info('setting up hdf5 stream sink...')
+
+        ### initialize timing options
+        self.request_timeout = options.request_timeout
+        self.processing_cadence = options.processing_cadence
+        self.is_running = False
+
+        ### kafka settings
+        self.kafka_settings = {'bootstrap.servers': options.kafka_server,
+                               'group.id': 'group_1'}
+
+        ### initialize consumers
+        self.consumer = Consumer(self.kafka_settings)
+        self.consumer.subscribe([options.input_topic_basename])
+
+        ### initialize queues
+        self.feature_queue = deque(maxlen = 300)
+
+        ### iDQ saving properties
+        self.write_cadence = options.write_cadence
+        self.tag = '%s-%s' % (options.instrument[:1], options.basename)
+
+        # get base temp directory
+        if '_CONDOR_SCRATCH_DIR' in os.environ:
+            tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
+        else:
+            tmp_dir = os.environ['TMPDIR']
+
+        # set up keys needed to do processing
+        channel_dict = multichannel_datasource.channel_dict_from_channel_file(options.channel_list)
+        self.keys = {}
+        for channel in channel_dict.keys():
+            f_samp = int(channel_dict[channel]['fsamp'])
+            f_high = min(2048, f_samp)
+            f_low = min(32, f_high)
+            n_rates = int(numpy.log2(f_high/f_low) + 1)
+            rates = [f_low*2**i for i in range(n_rates)]
+            for rate in rates:
+                self.keys[(channel, rate)] = None
+
+        # hdf saving properties
+        self.rootdir = options.rootdir
+        self.write_cadence = options.write_cadence
+        self.persist_cadence = options.persist_cadence
+        self.last_save_time = {key:None for key in self.keys}
+        columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
+        self.feature_data = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.write_cadence)
+
+        self.feature_name = '%s-%d-5000000000' % (self.tag, int(aggregator.now()))
+
+        trigger_path = os.path.join(self.tag, self.tag+"-"+str(self.feature_name.split("-")[2])[:5], self.tag+"-0001")
+        self.feature_path = os.path.join(os.path.abspath(self.rootdir), trigger_path)
+        self.tmp_path = os.path.join(tmp_dir, trigger_path)
+
+        # create temp and output directories if they don't exist
+        aggregator.makedir(self.feature_path)
+        aggregator.makedir(self.tmp_path)
+
+        # delete leftover temporary files
+        tmp_file = os.path.join(self.tmp_path, self.feature_name)+'.h5.tmp'
+        if os.path.isfile(tmp_file):
+            os.remove(tmp_file)
+
+    def fetch_data(self):
+        """
+        requests for a new message from an individual topic,
+        and add to the feature queue
+        """
+        message = self.consumer.poll(timeout=self.request_timeout)
+
+        ### only add to queue if no errors in receiving data
+        if message and not message.error():
+
+            ### parse and add to queue
+            features = json.loads(message.value())
+            self.add_to_queue(features['timestamp'], features['features'])
+
+    def add_to_queue(self, timestamp, data):
+        """
+        add a set of features for a given timestamp to the feature queue
+        """
+        self.feature_queue.appendleft((timestamp, data))
+
+    def process_queue(self):
+        """
+        takes data from the queue and adds to datasets, periodically persisting to disk
+        """
+
+        while self.feature_queue:
+            ### remove data with oldest timestamp and process
+            timestamp, features = self.feature_queue.pop()
+            logger.info('processing features for timestamp %d' % timestamp)
+
+            for feature in features:
+                channel = feature['channel']
+                rate = feature['rate']
+                key = (channel, rate)
+
+                ### set save times appropriately
+                if self.last_save_time[key] is None:
+                    self.last_save_time[key] = timestamp
+
+                ### save new dataset to disk every save cadence
+                if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.write_cadence):
+                    logger.info('saving dataset to disk for timestamp %d' % timestamp)
+                    self.feature_data.dump(self.tmp_path, self.feature_name, idq_utils.floor_div(self.last_save_time[key], self.write_cadence), key = key, tmp = True)
+                    self.last_save_time[key] = timestamp
+
+                ### create new file every persist cadence
+                if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.persist_cadence):
+                    logger.info('persisting file for range for gps range %d - %d' % (timestamp, timestamp-self.persist_cadence))
+                    self.persist_to_disk()
+
+                ### add new feature vector to dataset
+                self.feature_data.append(feature, key = key, buftime = timestamp)
+
+    def persist_to_disk(self):
+        """
+        moves a file from its temporary to final position
+        """
+        final_path = os.path.join(self.feature_path, self.feature_name)+".h5"
+        tmp_path = os.path.join(self.tmp_path, self.feature_name)+".h5.tmp"
+        shutil.move(tmp_path, final_path)
+
+    def start(self):
+        """
+        starts ingesting data and saving features to disk
+        """
+        logger.info('starting streaming hdf5 sink...')
+        self.is_running = True
+        while self.is_running:
+            ### ingest and combine incoming feature subsets, dropping late data
+            self.fetch_data()
+            ### push combined features downstream
+            while self.feature_queue:
+                self.process_queue()
+            ### repeat with processing cadence
+            time.sleep(self.processing_cadence)
+
+    def stop(self):
+        """
+        stops ingesting data and save rest of features to disk
+        """
+        logger.info('shutting down hdf5 sink...')
+        self.persist_to_disk()
+        ### FIXME: should also handle pushing rest of data in buffer
+        self.is_running = False
+
+class SignalHandler(object):
+    """
+    helper class to shut down the hdf5 sink gracefully before exiting
+    """
+    def __init__(self, sink, signals = [signal.SIGINT, signal.SIGTERM]):
+        self.sink = sink
+        for sig in signals:
+            signal.signal(sig, self)
+
+    def __call__(self, signum, frame):
+        #print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum
+        self.sink.stop()
+        sys.exit(0)
+
+#-------------------------------------------------
+#                    Main
+#-------------------------------------------------
+
+if __name__ == '__main__':
+    # parse arguments
+    options, args = parse_command_line()
+
+    ### set up logging
+    logger = logs.get_logger(
+        names.tag2logname(options.tag, 'hdf5_sink'),
+        log_level=options.log_level,
+        rootdir=options.rootdir,
+        verbose=options.verbose
+    )
+
+    # create hdf5 sink instance
+    sink = HDF5StreamSink(logger, options=options)
+
+    # install signal handler
+    SignalHandler(sink)
+
+    # start up hdf5 sink
+    sink.start()
diff --git a/gstlal-burst/bin/gstlal_feature_synchronizer b/gstlal-burst/bin/gstlal_feature_synchronizer
new file mode 100755
index 0000000000..93025e748d
--- /dev/null
+++ b/gstlal-burst/bin/gstlal_feature_synchronizer
@@ -0,0 +1,257 @@
+#!/usr/bin/env python
+
+# Copyright (C) 2017-2018  Patrick Godwin
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+__usage__ = "gstlal_feature_synchronizer [--options]"
+__description__ = "an executable to synchronize incoming gstlal feature extractor streams and send downstream"
+__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
+
+#-------------------------------------------------
+#                  Preamble
+#-------------------------------------------------
+
+import itertools
+import json
+import signal
+import sys
+import time
+from collections import deque
+from Queue import PriorityQueue
+from multiprocessing.dummy import Pool as ThreadPool
+from optparse import OptionParser
+
+import lal
+from lal import LIGOTimeGPS
+
+from confluent_kafka import Producer, Consumer, KafkaError
+
+from idq import logs
+from idq import names
+
+#-------------------------------------------------
+#                  Functions
+#-------------------------------------------------
+
+def parse_command_line():
+
+    parser = OptionParser(usage=__usage__, description=__description__)
+    parser.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
+    parser.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
+    parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
+    parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
+    parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
+    parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
+    parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
+    parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
+    parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename, i.e. {basename}_%02d. Required.")
+    parser.add_option("--output-topic-basename", metavar = "string", help = "Sets the output kafka topic name. Required.")
+    parser.add_option("--num-topics", type = "int", help = "Sets the number of input kafka topics to read from. Required.")
+
+    options, args = parser.parse_args()
+
+    return options, args
+
+#-------------------------------------------------
+#                   Classes
+#-------------------------------------------------
+
+class StreamSynchronizer(object):
+    """
+    Handles the synchronization of several incoming streams, populating data queues
+    and pushing feature vectors to a queue for downstream processing.
+    """
+    def __init__(self, logger, options):
+        logger.info('setting up stream synchronizer...')
+
+        ### initialize timing options
+        self.processing_cadence = options.processing_cadence
+        self.request_timeout = options.request_timeout
+        self.latency_timeout = options.latency_timeout
+        self.is_running = False
+
+        ### kafka settings
+        self.kafka_settings = {'bootstrap.servers': options.kafka_server}
+        self.num_topics = options.num_topics
+
+        ### initialize consumers
+        self.consumer_names = ['%s_%s' % (options.input_topic_basename, str(i).zfill(2)) for i in range(1, self.num_topics + 1)]
+        # FIXME: hacky way of introducing group id, should be a settable option
+        consumer_kafka_settings = self.kafka_settings
+        consumer_kafka_settings['group.id'] = 'group_1'
+        self.consumers = [Consumer(consumer_kafka_settings) for topic in self.consumer_names]
+        for topic, consumer in zip(self.consumer_names, self.consumers):
+            consumer.subscribe([topic])
+
+        ### initialize producer
+        self.producer_name = options.output_topic_basename
+        self.producer = Producer(self.kafka_settings)
+
+        ### initialize queues
+        self.feature_queue = PriorityQueue()
+        self.feature_buffer = deque(maxlen = 300)
+
+    def fetch_data(self, consumer):
+        """
+        requests for a new message from an individual topic,
+        and add to the feature queue
+        """
+        message = consumer.poll(timeout=self.request_timeout)
+
+        ### only add to queue if no errors in receiving data
+        if message and not message.error():
+
+            ### decode json and parse data
+            feature_subset = json.loads(message.value())
+
+            ### add to queue if timestamp is within timeout
+            if feature_subset['timestamp'] >= self.max_timeout():
+                self.add_to_queue(feature_subset['timestamp'], feature_subset['etg_data'])
+
+    def fetch_all_data(self):
+        """
+        requests for a new message from all topics, and add
+        to the feature queue
+        """
+        pool = ThreadPool(self.num_topics)
+        result = pool.map_async(self.fetch_data, self.consumers)
+        result.wait()
+        pool.close()
+
+    def add_to_queue(self, timestamp, data):
+        """
+        add a set of features for a given timestamp to the feature queue
+        """
+        self.feature_queue.put((timestamp, data))
+
+    def process_queue(self):
+        """
+        checks if conditions are right to combine new features for a given timestamp,
+        and if so, takes subsets from the feature queue, combines them, and push the
+        result to a buffer
+        """
+        num_elems = min(self.num_topics, self.feature_queue.qsize())
+        timestamps = [block[0] for block in self.feature_queue.queue[:num_elems]]
+
+        ### check if either all timestamps are identical, or if the timestamps
+        ### are old enough to process regardless. if so, process elements from queue
+        if timestamps:
+            if timestamps[0] <= self.max_timeout() or (len(set(timestamps)) == 1 and num_elems == self.num_topics):
+                ### find number of elements to remove from queue
+                if timestamps[0] <= self.max_timeout():
+                    num_subsets = len([timestamp for timestamp in timestamps if timestamp == timestamps[0]])
+                else:
+                    num_subsets = num_elems
+
+                ### remove data with oldest timestamp and process
+                subsets = [self.feature_queue.get() for i in range(num_subsets)]
+                logger.info('combining %d / %d feature subsets for timestamp %d' % (len(subsets),self.num_topics,timestamps[0]))
+                features = self.combine_subsets(subsets)
+                self.feature_buffer.appendleft((timestamps[0], features))
+
+    def combine_subsets(self, subsets):
+        """
+        combine subsets of features from multiple streams in a sensible way
+        """
+        datum = [subset[1] for subset in subsets]
+        return list(itertools.chain(*datum))
+
+    def push_features(self):
+        """
+        pushes any features that have been combined downstream in an outgoing topic
+        """
+        # push full feature vector to producer if buffer isn't empty
+        if self.feature_buffer:
+            timestamp, features = self.feature_buffer.pop()
+            logger.info('pushing features with timestamp %d downstream' % timestamp)
+            feature_packet = {'timestamp': timestamp, 'features': list(features)}
+            self.producer.produce(timestamp = timestamp, topic = self.producer_name, value = json.dumps(feature_packet))
+
+    def max_timeout(self):
+        """
+        calculates the oldest timestamp allowed for incoming data
+        """
+        return int(LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)) - self.latency_timeout
+
+    def synchronize(self):
+        """
+        puts all the synchronization steps together and adds a timer based on the
+        processing cadence to run periodically
+        """
+        while self.is_running:
+            ### ingest and combine incoming feature subsets, dropping late data
+            self.fetch_all_data()
+            self.process_queue()
+            ### push combined features downstream
+            while self.feature_buffer:
+                self.push_features()
+            ### repeat with processing cadence
+            time.sleep(self.processing_cadence)
+
+    def start(self):
+        """
+        starts the synchronization sequence
+        """
+        logger.info('starting stream synchronizer for %d incoming feature streams...' % self.num_topics)
+        self.is_running = True
+        self.synchronize()
+
+    def stop(self):
+        """
+        stops the synchronization sequence
+        """
+        logger.info('shutting down stream synchronizer...')
+        ### FIXME: should also handle pushing rest of data in buffer
+        self.is_running = False
+
+class SignalHandler(object):
+    """
+    helper class to shut down the synchronizer gracefully before exiting
+    """
+    def __init__(self, synchronizer, signals = [signal.SIGINT, signal.SIGTERM]):
+        self.synchronizer = synchronizer
+        for sig in signals:
+            signal.signal(sig, self)
+
+    def __call__(self, signum, frame):
+        print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum
+        self.synchronizer.stop()
+        sys.exit(0)
+
+#-------------------------------------------------
+#                    Main
+#-------------------------------------------------
+
+if __name__ == '__main__':
+    # parse arguments
+    options, args = parse_command_line()
+
+    ### set up logging
+    logger = logs.get_logger(
+        names.tag2logname(options.tag, 'synchronizer'),
+        log_level=options.log_level,
+        rootdir=options.rootdir,
+        verbose=options.verbose
+    )
+
+    # create ETG synchronizer instance
+    synchronizer = StreamSynchronizer(logger, options=options)
+
+    # install signal handler
+    SignalHandler(synchronizer)
+
+    # start up synchronizer
+    synchronizer.start()
-- 
GitLab