Skip to content
Snippets Groups Projects
Commit db057ee1 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

add gstlal_feature_synchronizer + gstlal_feature_hdf5_sink to gstlal-burst for...

add gstlal_feature_synchronizer + gstlal_feature_hdf5_sink to gstlal-burst for saving and synchronization of features from extractor
parent 02f1a02e
No related branches found
No related tags found
No related merge requests found
......@@ -5,4 +5,6 @@ dist_bin_SCRIPTS = \
gstlal_feature_extractor_pipe \
gstlal_feature_extractor_pipe_online \
gstlal_feature_extractor_whitener_check \
gstlal_feature_extractor_template_overlap
gstlal_feature_extractor_template_overlap \
gstlal_feature_hdf5_sink \
gstlal_feature_synchronizer
#!/usr/bin/env python
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_hdf5_sink [--options]"
__description__ = "an executable to dump streaming data to disk via hdf5"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
import itertools
import json
import os
import signal
import sys
import time
import shutil
from collections import deque
from optparse import OptionParser
from confluent_kafka import Consumer, KafkaError
import h5py
import numpy
from gstlal import multichannel_datasource
from gstlal import idq_utils
from gstlal import aggregator
from idq import io
from idq import logs
from idq import names
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = OptionParser(usage=__usage__, description=__description__)
parser.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
parser.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
parser.add_option("--basename", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Sets the basename for files written to disk. Default = GSTLAL_IDQ_TRIGGERS")
parser.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
parser.add_option("--write-cadence", type = "int", default = 100, help = "Rate at which the feature data is written to disk. Default = 100 seconds.")
parser.add_option("--persist-cadence", type = "int", default = 10000, help = "Rate at which new hdf5 files are written to disk. Default = 10000 seconds.")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class HDF5StreamSink(object):
"""
Handles the processing of incoming streaming features, saving datasets to disk in hdf5 format.
"""
def __init__(self, logger, options):
logger.info('setting up hdf5 stream sink...')
### initialize timing options
self.request_timeout = options.request_timeout
self.processing_cadence = options.processing_cadence
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server,
'group.id': 'group_1'}
### initialize consumers
self.consumer = Consumer(self.kafka_settings)
self.consumer.subscribe([options.input_topic_basename])
### initialize queues
self.feature_queue = deque(maxlen = 300)
### iDQ saving properties
self.write_cadence = options.write_cadence
self.tag = '%s-%s' % (options.instrument[:1], options.basename)
# get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
# set up keys needed to do processing
channel_dict = multichannel_datasource.channel_dict_from_channel_file(options.channel_list)
self.keys = {}
for channel in channel_dict.keys():
f_samp = int(channel_dict[channel]['fsamp'])
f_high = min(2048, f_samp)
f_low = min(32, f_high)
n_rates = int(numpy.log2(f_high/f_low) + 1)
rates = [f_low*2**i for i in range(n_rates)]
for rate in rates:
self.keys[(channel, rate)] = None
# hdf saving properties
self.rootdir = options.rootdir
self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
self.last_save_time = {key:None for key in self.keys}
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.feature_data = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.write_cadence)
self.feature_name = '%s-%d-5000000000' % (self.tag, int(aggregator.now()))
trigger_path = os.path.join(self.tag, self.tag+"-"+str(self.feature_name.split("-")[2])[:5], self.tag+"-0001")
self.feature_path = os.path.join(os.path.abspath(self.rootdir), trigger_path)
self.tmp_path = os.path.join(tmp_dir, trigger_path)
# create temp and output directories if they don't exist
aggregator.makedir(self.feature_path)
aggregator.makedir(self.tmp_path)
# delete leftover temporary files
tmp_file = os.path.join(self.tmp_path, self.feature_name)+'.h5.tmp'
if os.path.isfile(tmp_file):
os.remove(tmp_file)
def fetch_data(self):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
message = self.consumer.poll(timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
### parse and add to queue
features = json.loads(message.value())
self.add_to_queue(features['timestamp'], features['features'])
def add_to_queue(self, timestamp, data):
"""
add a set of features for a given timestamp to the feature queue
"""
self.feature_queue.appendleft((timestamp, data))
def process_queue(self):
"""
takes data from the queue and adds to datasets, periodically persisting to disk
"""
while self.feature_queue:
### remove data with oldest timestamp and process
timestamp, features = self.feature_queue.pop()
logger.info('processing features for timestamp %d' % timestamp)
for feature in features:
channel = feature['channel']
rate = feature['rate']
key = (channel, rate)
### set save times appropriately
if self.last_save_time[key] is None:
self.last_save_time[key] = timestamp
### save new dataset to disk every save cadence
if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.write_cadence):
logger.info('saving dataset to disk for timestamp %d' % timestamp)
self.feature_data.dump(self.tmp_path, self.feature_name, idq_utils.floor_div(self.last_save_time[key], self.write_cadence), key = key, tmp = True)
self.last_save_time[key] = timestamp
### create new file every persist cadence
if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.persist_cadence):
logger.info('persisting file for range for gps range %d - %d' % (timestamp, timestamp-self.persist_cadence))
self.persist_to_disk()
### add new feature vector to dataset
self.feature_data.append(feature, key = key, buftime = timestamp)
def persist_to_disk(self):
"""
moves a file from its temporary to final position
"""
final_path = os.path.join(self.feature_path, self.feature_name)+".h5"
tmp_path = os.path.join(self.tmp_path, self.feature_name)+".h5.tmp"
shutil.move(tmp_path, final_path)
def start(self):
"""
starts ingesting data and saving features to disk
"""
logger.info('starting streaming hdf5 sink...')
self.is_running = True
while self.is_running:
### ingest and combine incoming feature subsets, dropping late data
self.fetch_data()
### push combined features downstream
while self.feature_queue:
self.process_queue()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def stop(self):
"""
stops ingesting data and save rest of features to disk
"""
logger.info('shutting down hdf5 sink...')
self.persist_to_disk()
### FIXME: should also handle pushing rest of data in buffer
self.is_running = False
class SignalHandler(object):
"""
helper class to shut down the hdf5 sink gracefully before exiting
"""
def __init__(self, sink, signals = [signal.SIGINT, signal.SIGTERM]):
self.sink = sink
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
#print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum
self.sink.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
### set up logging
logger = logs.get_logger(
names.tag2logname(options.tag, 'hdf5_sink'),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create hdf5 sink instance
sink = HDF5StreamSink(logger, options=options)
# install signal handler
SignalHandler(sink)
# start up hdf5 sink
sink.start()
#!/usr/bin/env python
# Copyright (C) 2017-2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_synchronizer [--options]"
__description__ = "an executable to synchronize incoming gstlal feature extractor streams and send downstream"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
import itertools
import json
import signal
import sys
import time
from collections import deque
from Queue import PriorityQueue
from multiprocessing.dummy import Pool as ThreadPool
from optparse import OptionParser
import lal
from lal import LIGOTimeGPS
from confluent_kafka import Producer, Consumer, KafkaError
from idq import logs
from idq import names
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = OptionParser(usage=__usage__, description=__description__)
parser.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
parser.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename, i.e. {basename}_%02d. Required.")
parser.add_option("--output-topic-basename", metavar = "string", help = "Sets the output kafka topic name. Required.")
parser.add_option("--num-topics", type = "int", help = "Sets the number of input kafka topics to read from. Required.")
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamSynchronizer(object):
"""
Handles the synchronization of several incoming streams, populating data queues
and pushing feature vectors to a queue for downstream processing.
"""
def __init__(self, logger, options):
logger.info('setting up stream synchronizer...')
### initialize timing options
self.processing_cadence = options.processing_cadence
self.request_timeout = options.request_timeout
self.latency_timeout = options.latency_timeout
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server}
self.num_topics = options.num_topics
### initialize consumers
self.consumer_names = ['%s_%s' % (options.input_topic_basename, str(i).zfill(2)) for i in range(1, self.num_topics + 1)]
# FIXME: hacky way of introducing group id, should be a settable option
consumer_kafka_settings = self.kafka_settings
consumer_kafka_settings['group.id'] = 'group_1'
self.consumers = [Consumer(consumer_kafka_settings) for topic in self.consumer_names]
for topic, consumer in zip(self.consumer_names, self.consumers):
consumer.subscribe([topic])
### initialize producer
self.producer_name = options.output_topic_basename
self.producer = Producer(self.kafka_settings)
### initialize queues
self.feature_queue = PriorityQueue()
self.feature_buffer = deque(maxlen = 300)
def fetch_data(self, consumer):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
message = consumer.poll(timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
### decode json and parse data
feature_subset = json.loads(message.value())
### add to queue if timestamp is within timeout
if feature_subset['timestamp'] >= self.max_timeout():
self.add_to_queue(feature_subset['timestamp'], feature_subset['etg_data'])
def fetch_all_data(self):
"""
requests for a new message from all topics, and add
to the feature queue
"""
pool = ThreadPool(self.num_topics)
result = pool.map_async(self.fetch_data, self.consumers)
result.wait()
pool.close()
def add_to_queue(self, timestamp, data):
"""
add a set of features for a given timestamp to the feature queue
"""
self.feature_queue.put((timestamp, data))
def process_queue(self):
"""
checks if conditions are right to combine new features for a given timestamp,
and if so, takes subsets from the feature queue, combines them, and push the
result to a buffer
"""
num_elems = min(self.num_topics, self.feature_queue.qsize())
timestamps = [block[0] for block in self.feature_queue.queue[:num_elems]]
### check if either all timestamps are identical, or if the timestamps
### are old enough to process regardless. if so, process elements from queue
if timestamps:
if timestamps[0] <= self.max_timeout() or (len(set(timestamps)) == 1 and num_elems == self.num_topics):
### find number of elements to remove from queue
if timestamps[0] <= self.max_timeout():
num_subsets = len([timestamp for timestamp in timestamps if timestamp == timestamps[0]])
else:
num_subsets = num_elems
### remove data with oldest timestamp and process
subsets = [self.feature_queue.get() for i in range(num_subsets)]
logger.info('combining %d / %d feature subsets for timestamp %d' % (len(subsets),self.num_topics,timestamps[0]))
features = self.combine_subsets(subsets)
self.feature_buffer.appendleft((timestamps[0], features))
def combine_subsets(self, subsets):
"""
combine subsets of features from multiple streams in a sensible way
"""
datum = [subset[1] for subset in subsets]
return list(itertools.chain(*datum))
def push_features(self):
"""
pushes any features that have been combined downstream in an outgoing topic
"""
# push full feature vector to producer if buffer isn't empty
if self.feature_buffer:
timestamp, features = self.feature_buffer.pop()
logger.info('pushing features with timestamp %d downstream' % timestamp)
feature_packet = {'timestamp': timestamp, 'features': list(features)}
self.producer.produce(timestamp = timestamp, topic = self.producer_name, value = json.dumps(feature_packet))
def max_timeout(self):
"""
calculates the oldest timestamp allowed for incoming data
"""
return int(LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)) - self.latency_timeout
def synchronize(self):
"""
puts all the synchronization steps together and adds a timer based on the
processing cadence to run periodically
"""
while self.is_running:
### ingest and combine incoming feature subsets, dropping late data
self.fetch_all_data()
self.process_queue()
### push combined features downstream
while self.feature_buffer:
self.push_features()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def start(self):
"""
starts the synchronization sequence
"""
logger.info('starting stream synchronizer for %d incoming feature streams...' % self.num_topics)
self.is_running = True
self.synchronize()
def stop(self):
"""
stops the synchronization sequence
"""
logger.info('shutting down stream synchronizer...')
### FIXME: should also handle pushing rest of data in buffer
self.is_running = False
class SignalHandler(object):
"""
helper class to shut down the synchronizer gracefully before exiting
"""
def __init__(self, synchronizer, signals = [signal.SIGINT, signal.SIGTERM]):
self.synchronizer = synchronizer
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum
self.synchronizer.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
### set up logging
logger = logs.get_logger(
names.tag2logname(options.tag, 'synchronizer'),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create ETG synchronizer instance
synchronizer = StreamSynchronizer(logger, options=options)
# install signal handler
SignalHandler(synchronizer)
# start up synchronizer
synchronizer.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment