Skip to content
Snippets Groups Projects
Commit 4ffdc12e authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_extractor: changed use_kafka option to be part of save_format...

gstlal_feature_extractor: changed use_kafka option to be part of save_format instead, to allow only one data transfer/saving option to run at a given time, fixed issue with hdf5 file saving where there were repeated timestamps, cleaned up finalization of saving features to disk when pipeline finishes, changed some default save/persist cadences
parent fbd43e5b
No related branches found
No related tags found
No related merge requests found
......@@ -235,12 +235,15 @@ class MultiChannelHandler(simplehandler.Handler):
self.subset_id = str(kwargs.pop("subset_id")).zfill(4)
### iDQ saving properties
self.timestamp = None
self.last_save_time = None
self.last_persist_time = None
self.cadence = options.cadence
self.persist_cadence = options.persist_cadence
self.feature_start_time = options.feature_start_time
self.feature_end_time = options.feature_end_time
self.columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.feature_queue = idq_utils.FeatureQueue(self.keys, self.columns, self.sample_rate)
# set whether data source is live
self.is_live = data_source_info.data_source in data_source_info.live_sources
......@@ -251,38 +254,31 @@ class MultiChannelHandler(simplehandler.Handler):
else:
self.tmp_dir = os.environ['TMPDIR']
# row properties
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
# feature saving properties
if options.save_format == 'hdf5':
self.fdata = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
duration = idq_utils.floor_div(self.feature_start_time + self.persist_cadence, self.persist_cadence) - self.feature_start_time
self.set_hdf_file_properties(self.feature_start_time, duration)
self.fdata = idq_utils.HDF5FeatureData(self.columns, keys = self.keys, cadence = self.cadence)
elif options.save_format == 'ascii':
# create header for trigger file
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel")
self.fdata = deque(maxlen = 25000)
self.fdata.append(self.header)
# set up stream related properties
self.stream_event = idq_utils.FeatureQueue(self.keys, columns, self.sample_rate, self.num_samples)
# set up bottle routes for PSDs and extracted feature data
self.psds = {}
self.feature_data = deque(maxlen = 2000)
if not options.disable_web_service:
bottle.route("/psds.xml")(self.web_get_psd_xml)
bottle.route("/feature_subset")(self.web_get_feature_data)
# set up kafka related properties
if options.use_kafka:
elif options.save_format == 'kafka':
self.kafka_partition = options.kafka_partition
self.kafka_topic = options.kafka_topic
self.kafka_conf = {'bootstrap.servers': options.kafka_server}
self.producer = Producer(self.kafka_conf)
elif options.save_format == 'bottle':
assert not options.disable_web_service, 'web service is not available to use bottle to transfer features'
self.feature_data = deque(maxlen = 2000)
bottle.route("/feature_subset")(self.web_get_feature_data)
# set up bottle routes for PSDs
self.psds = {}
if not options.disable_web_service:
bottle.route("/psds.xml")(self.web_get_psd_xml)
super(MultiChannelHandler, self).__init__(mainloop, pipeline, **kwargs)
def do_on_message(self, bus, message):
......@@ -324,35 +320,45 @@ class MultiChannelHandler(simplehandler.Handler):
channel, rate = sink_dict[elem]
# push new stream event to queue if done processing current timestamp
if len(self.stream_event):
feature_subset = self.stream_event.pop()
if options.use_kafka:
self.producer.produce(timestamp = feature_subset['timestamp'], topic = self.kafka_topic, value = json.dumps(feature_subset))
else:
self.feature_data.append(feature_subset)
# set save time appropriately
if self.last_save_time is None:
self.last_save_time = buftime
self.last_persist_time = buftime
# Save triggers once per cadence
if idq_utils.in_new_epoch(buftime, self.last_save_time, self.cadence) or (buftime == self.feature_end_time):
logger.info("saving features to disk at timestamp = %d" % buftime)
if len(self.feature_queue):
feature_subset = self.feature_queue.pop()
self.timestamp = feature_subset['timestamp']
# set save times and initialize specific saving properties if not already set
if self.last_save_time is None:
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
if options.save_format =='hdf5':
duration = idq_utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if options.save_format == 'hdf5' or options.save_format == 'ascii':
if self.timestamp and idq_utils.in_new_epoch(self.timestamp, self.last_save_time, self.cadence) or (self.timestamp == self.feature_end_time):
logger.info("saving features to disk at timestamp = %d" % self.timestamp)
if options.save_format == 'hdf5':
self.to_hdf_file()
elif options.save_format == 'ascii':
self.to_trigger_file(self.timestamp)
self.fdata.clear()
self.fdata.append(self.header)
self.last_save_time = self.timestamp
# persist triggers once per persist cadence if using hdf5 format
if options.save_format == 'hdf5':
self.to_hdf_file()
elif options.save_format == 'ascii':
self.to_trigger_file(buftime)
self.fdata.clear()
self.fdata.append(self.header)
self.last_save_time = buftime
# persist triggers once per persist cadence if using hdf5 format
if idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5':
logger.info("persisting features to disk at timestamp = %d" % buftime)
self.finish_hdf_file()
self.last_persist_time = buftime
self.set_hdf_file_properties(buftime, self.persist_cadence)
if self.timestamp and idq_utils.in_new_epoch(self.timestamp, self.last_persist_time, self.persist_cadence):
logger.info("persisting features to disk at timestamp = %d" % self.timestamp)
self.finish_hdf_file()
self.last_persist_time = self.timestamp
self.set_hdf_file_properties(self.timestamp, self.persist_cadence)
# add features to respective format specified
if options.save_format == 'kafka':
self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(feature_subset))
elif options.save_format == 'bottle':
self.feature_data.append(feature_subset)
elif options.save_format == 'hdf5':
self.fdata.append(self.timestamp, feature_subset['features'])
# read buffer contents
for i in range(buf.n_memory()):
......@@ -407,16 +413,10 @@ class MultiChannelHandler(simplehandler.Handler):
timestamp = int(numpy.floor(trigger_time))
feature_row = {'timestamp':timestamp, 'channel':channel, 'start_time':start_time, 'stop_time':stop_time, 'snr':row.snr,
'trigger_time':trigger_time, 'frequency':freq, 'q':q, 'phase':row.phase, 'sigmasq':row.sigmasq, 'chisq':row.chisq}
self.stream_event.append(timestamp, channel, feature_row)
self.feature_queue.append(timestamp, channel, feature_row)
# save iDQ compatible data
if options.save_format == 'hdf5':
if self.aggregate_rate:
key = channel
else:
key = os.path.join(channel, str(rate).zfill(4))
self.fdata.append(feature_row, key = key, buftime = buftime)
elif options.save_format == 'ascii':
if options.save_format == 'ascii':
channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag))
......@@ -463,6 +463,23 @@ class MultiChannelHandler(simplehandler.Handler):
tmp_path = os.path.join(self.tmp_path, self.fname)+".h5.tmp"
shutil.move(tmp_path, final_path)
def finalize(self):
"""
Clears out remaining features from the queue for saving to disk.
"""
# save remaining triggers
if options.save_format == 'hdf5':
self.feature_queue.flush()
while len(self.feature_queue):
feature_subset = self.feature_queue.pop()
self.fdata.append(feature_subset['timestamp'], feature_subset['features'])
self.to_hdf_file()
self.finish_hdf_file()
elif options.save_format == 'ascii':
self.to_trigger_file()
def set_hdf_file_properties(self, start_time, duration):
"""
Returns the file name, as well as locations of temporary and permanent locations of
......@@ -630,13 +647,12 @@ def parse_command_line():
group = OptionGroup(parser, "Saving Options", "Adjust parameters used for saving/persisting features to disk as well as directories specified")
group.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
group.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Set the filename description in which to save the output.")
group.add_option("--save-format", metavar = "string", default = "hdf5", help = "Specifies the save format (ascii or hdf5) of features written to disk. Default = hdf5")
group.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
group.add_option("--persist-cadence", type = "int", default = 320, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Only used for live data, and needs to be a multiple of save cadence. Default = 320 seconds.")
group.add_option("--save-format", metavar = "string", default = "hdf5", help = "Specifies the save format (ascii/hdf5/kafka/bottle) of features written to disk. Default = hdf5")
group.add_option("--cadence", type = "int", default = 20, help = "Rate at which to write trigger files to disk. Default = 20 seconds.")
group.add_option("--persist-cadence", type = "int", default = 200, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Needs to be a multiple of save cadence. Default = 200 seconds.")
parser.add_option_group(group)
group = OptionGroup(parser, "Kafka Options", "Adjust settings used for pushing extracted features to a Kafka topic.")
group.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.")
group.add_option("--kafka-partition", metavar = "string", help = "If using Kafka, sets the partition that this feature extractor is assigned to.")
group.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this feature extractor publishes feature vector subsets to.")
group.add_option("--kafka-server", metavar = "string", help = "If using Kafka, sets the server url that the kafka topic is hosted on.")
......@@ -700,7 +716,7 @@ basename = '%s-%s' % (instrument[:1], options.description)
waveforms = {}
# only load kafka library if triggers are transferred via kafka topic
if options.use_kafka:
if options.save_format == 'kafka':
from confluent_kafka import Producer
#
......@@ -931,12 +947,8 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
mainloop.run()
# save remaining triggers
if options.save_format == 'hdf5':
handler.to_hdf_file()
handler.finish_hdf_file()
logger.info("persisting features to disk...")
elif options.save_format == 'ascii':
handler.to_trigger_file()
logger.info("persisting features to disk...")
handler.finalize()
#
# Shut down pipeline
......
......@@ -217,91 +217,46 @@ class HDF5FeatureData(FeatureData):
"""
def __init__(self, columns, keys, **kwargs):
super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
self.padding = 1
self.cadence = kwargs.pop('cadence')
self.dtype = [(column, '<f8') for column in self.columns]
self.feature_data = {key: numpy.empty(((self.cadence+self.padding),), dtype = self.dtype) for key in keys}
self.last_save_time = None
self.feature_data = {key: numpy.empty((self.cadence,), dtype = self.dtype) for key in keys}
self.last_save_time = 0
self.clear()
def dump(self, path, base, start_time, key = None, tmp = False):
def dump(self, path, base, start_time, tmp = False):
"""
Saves the current cadence of gps triggers to disk and clear out data
Saves the current cadence of features to disk and clear out data
"""
name = "%d_%d" % (start_time, self.cadence)
if key:
for key in self.keys:
create_new_dataset(path, base, self.feature_data[key], name=name, group=key, tmp=tmp)
self.clear(key)
else:
for key in self.keys:
create_new_dataset(path, base, self.feature_data[key], name=name, group=key, tmp=tmp)
self.clear()
def append(self, value, key = None, buftime = None):
"""
Append a trigger row to data structure
"""
if buftime and key:
self.last_save_time = floor_div(buftime, self.cadence)
idx = int(numpy.floor(value['trigger_time'])) - self.last_save_time
if numpy.isnan(self.feature_data[key][idx][self.columns[0]]) or (value['snr'] > self.feature_data[key][idx]['snr']):
self.feature_data[key][idx] = numpy.array(tuple(value[col] for col in self.columns), dtype=self.dtype)
def clear(self, key = None):
if key:
self.feature_data[key][:] = numpy.nan
else:
for key in self.keys:
self.feature_data[key][:] = numpy.nan
class HDF5SeriesFeatureData(FeatureData):
"""!
Saves feature data with varying dataset lengths to hdf5.
"""
def __init__(self, columns, keys, **kwargs):
super(HDF5SeriesFeatureData, self).__init__(columns, keys = keys, **kwargs)
self.cadence = kwargs.pop('cadence')
self.dtype = [(column, '<f8') for column in self.columns]
self.feature_data = {key: [] for key in keys}
self.clear()
def dump(self, path, base, start_time, key = None, tmp = False):
def append(self, timestamp, features):
"""
Saves the current cadence of gps triggers to disk and clear out data
Append a feature buffer to data structure
"""
name = "%d_%d" % (start_time, self.cadence)
if key:
create_new_dataset(path, base, numpy.array(self.feature_data[key], dtype=self.dtype), name=name, group=key, tmp=tmp)
self.clear(key)
else:
for key in self.keys:
create_new_dataset(path, base, numpy.array(self.feature_data[key], dtype=self.dtype), name=name, group=key, tmp=tmp)
self.clear()
self.last_save_time = floor_div(timestamp, self.cadence)
idx = timestamp - self.last_save_time
def append(self, value, key = None, buftime = None):
"""
Append a trigger row to data structure
"""
if buftime and key:
self.feature_data[key].append(tuple(value[col] for col in self.columns))
### FIXME: assumes there is just one row per channel for now (denoting a sample rate of 1Hz)
for key in features.keys():
if features[key][0]:
self.feature_data[key][idx] = numpy.array(tuple(features[key][0][col] for col in self.columns), dtype=self.dtype)
def clear(self, key = None):
if key:
self.feature_data[key] = []
else:
for key in self.keys:
self.feature_data[key] = []
def clear(self):
for key in self.keys:
self.feature_data[key][:] = numpy.nan
class FeatureQueue(object):
"""
Class for storing feature data.
NOTE: assumes that ingested features are time ordered.
"""
def __init__(self, channels, columns, sample_rate, num_samples):
def __init__(self, channels, columns, sample_rate):
self.channels = channels
self.columns = columns
self.sample_rate = sample_rate
self.num_samples = num_samples
self.out_queue = deque(maxlen = 5)
self.in_queue = {}
self.counter = Counter()
......@@ -331,6 +286,12 @@ class FeatureQueue(object):
if len(self):
return self.out_queue.popleft()
def flush(self):
while self.in_queue:
oldest_timestamp = min(self.counter.keys())
del self.counter[oldest_timestamp]
self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
def _create_buffer(self):
return {channel: [None for x in range(self.sample_rate)] for channel in self.channels}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment