Skip to content
Snippets Groups Projects
Commit 37b8d6d2 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_extractor: changed file saving to always persist features per...

gstlal_feature_extractor: changed file saving to always persist features per persist_cadence rather than it being a feature for live data only. changed how features are pushed to kafka so that timestamps are aggregated correctly and depend on trigger time rather than buffer time
parent 0091fe01
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -217,12 +217,14 @@ class MultiChannelHandler(simplehandler.Handler):
self.instrument = data_source_info.instrument
self.frame_segments = data_source_info.frame_segments
self.keys = kwargs.pop("keys")
self.num_samples = len(self.keys)
self.sample_rate = 1 # NOTE: hard-coded for now
self.waveforms = kwargs.pop("waveforms")
self.basename = kwargs.pop("basename")
self.waveform_type = options.waveform
# format keys used for saving, etc.
self.aggregate_rate = True
self.aggregate_rate = True # NOTE: hard-coded for now
if self.aggregate_rate:
self.keys = list(set([key[0] for key in self.keys]))
else:
......@@ -251,17 +253,11 @@ class MultiChannelHandler(simplehandler.Handler):
# row properties
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.Row = namedtuple('Row', columns)
# feature saving properties
if options.save_format == 'hdf5':
self.fdata = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
if self.is_live:
duration = idq_utils.floor_div(self.feature_start_time + self.persist_cadence, self.persist_cadence) - self.feature_start_time
else:
duration = self.feature_end_time - self.feature_start_time
duration = idq_utils.floor_div(self.feature_start_time + self.persist_cadence, self.persist_cadence) - self.feature_start_time
self.set_hdf_file_properties(self.feature_start_time, duration)
elif options.save_format == 'ascii':
......@@ -271,8 +267,7 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata.append(self.header)
# set up stream related properties
self.stream_event = deque(maxlen = 20000)
self.last_stream_event_time = None
self.stream_event = idq_utils.FeatureQueue(self.keys, columns, self.sample_rate, self.num_samples)
# set up bottle routes for PSDs and extracted feature data
self.psds = {}
......@@ -329,16 +324,12 @@ class MultiChannelHandler(simplehandler.Handler):
channel, rate = sink_dict[elem]
# push new stream event to queue if done processing current timestamp
if self.last_stream_event_time is None:
self.last_stream_event_time = buftime
if self.last_stream_event_time < buftime:
feature_subset = {'timestamp': self.last_stream_event_time, 'feature_data': list(self.stream_event)}
if len(self.stream_event):
feature_subset = self.stream_event.pop()
if options.use_kafka:
self.producer.produce(timestamp = self.last_stream_event_time, topic = self.kafka_topic, value = json.dumps(feature_subset))
self.producer.produce(timestamp = feature_subset['timestamp'], topic = self.kafka_topic, value = json.dumps(feature_subset))
else:
self.feature_data.append(feature_subset)
self.stream_event.clear()
self.last_stream_event_time = buftime
# set save time appropriately
if self.last_save_time is None:
......@@ -356,8 +347,8 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata.append(self.header)
self.last_save_time = buftime
# persist triggers once per persist cadence if using hdf5 format and running with live data
if self.is_live and idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5':
# persist triggers once per persist cadence if using hdf5 format
if idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5':
logger.info("persisting features to disk at timestamp = %d" % buftime)
self.finish_hdf_file()
self.last_persist_time = buftime
......@@ -413,9 +404,10 @@ class MultiChannelHandler(simplehandler.Handler):
stop_time = trigger_time
# append row for data transfer/saving
feature_row = self.Row(start_time=start_time, stop_time=stop_time, trigger_time=trigger_time,
frequency=freq, q=q, phase=row.phase, sigmasq=row.sigmasq, chisq=row.chisq, snr=row.snr)
self.stream_event.append(feature_row._asdict().update({'timestamp':buftime, 'channel':channel, 'rate':rate}))
timestamp = int(numpy.floor(trigger_time))
feature_row = {'timestamp':timestamp, 'channel':channel, 'start_time':start_time, 'stop_time':stop_time, 'snr':row.snr,
'trigger_time':trigger_time, 'frequency':freq, 'q':q, 'phase':row.phase, 'sigmasq':row.sigmasq, 'chisq':row.chisq}
self.stream_event.append(timestamp, channel, feature_row)
# save iDQ compatible data
if options.save_format == 'hdf5':
......
......@@ -26,7 +26,7 @@
import bisect
from collections import defaultdict
from collections import Counter, defaultdict, deque
import glob
import logging
import os
......@@ -217,10 +217,10 @@ class HDF5FeatureData(FeatureData):
"""
def __init__(self, columns, keys, **kwargs):
super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
self.latency = 1
self.padding = 1
self.cadence = kwargs.pop('cadence')
self.dtype = [(column, '<f8') for column in self.columns]
self.feature_data = {key: numpy.empty(((self.cadence+self.latency),), dtype = self.dtype) for key in keys}
self.feature_data = {key: numpy.empty(((self.cadence+self.padding),), dtype = self.dtype) for key in keys}
self.last_save_time = None
self.clear()
......@@ -243,9 +243,9 @@ class HDF5FeatureData(FeatureData):
"""
if buftime and key:
self.last_save_time = floor_div(buftime, self.cadence)
idx = int(numpy.floor(value.trigger_time)) - self.last_save_time
if numpy.isnan(self.feature_data[key][idx][self.columns[0]]) or (value.snr > self.feature_data[key][idx]['snr']):
self.feature_data[key][idx] = numpy.array(value, dtype=self.dtype)
idx = int(numpy.floor(value['trigger_time'])) - self.last_save_time
if numpy.isnan(self.feature_data[key][idx][self.columns[0]]) or (value['snr'] > self.feature_data[key][idx]['snr']):
self.feature_data[key][idx] = numpy.array(tuple(value[col] for col in self.columns), dtype=self.dtype)
def clear(self, key = None):
if key:
......@@ -283,7 +283,7 @@ class HDF5SeriesFeatureData(FeatureData):
Append a trigger row to data structure
"""
if buftime and key:
self.feature_data[key].append(value)
self.feature_data[key].append(tuple(value[col] for col in self.columns))
def clear(self, key = None):
if key:
......@@ -292,6 +292,54 @@ class HDF5SeriesFeatureData(FeatureData):
for key in self.keys:
self.feature_data[key] = []
class FeatureQueue(object):
"""
Class for storing feature data.
NOTE: assumes that ingested features are time ordered.
"""
def __init__(self, channels, columns, sample_rate, num_samples):
self.channels = channels
self.columns = columns
self.sample_rate = sample_rate
self.num_samples = num_samples
self.out_queue = deque(maxlen = 5)
self.in_queue = {}
self.counter = Counter()
self.last_timestamp = 0
self.effective_latency = 2
def append(self, timestamp, channel, row):
if timestamp > self.last_timestamp:
### create new buffer if one isn't available for new timestamp
if timestamp not in self.in_queue:
self.in_queue[timestamp] = self._create_buffer()
self.counter[timestamp] += 1
### store row, aggregating if necessary
idx = self._idx(timestamp)
if not self.in_queue[timestamp][channel][idx] or (row['snr'] > self.in_queue[timestamp][channel][idx]['snr']):
self.in_queue[timestamp][channel][idx] = row
### check if there's enough new samples that the oldest sample needs to be pushed
if len(self.counter) > self.effective_latency:
oldest_timestamp = min(self.counter.keys())
self.last_timestamp = oldest_timestamp
self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
del self.counter[oldest_timestamp]
def pop(self):
if len(self):
return self.out_queue.popleft()
def _create_buffer(self):
return {channel: [None for x in range(self.sample_rate)] for channel in self.channels}
def _idx(self, timestamp):
return int(numpy.floor((timestamp % 1) * self.sample_rate))
def __len__(self):
return len(self.out_queue)
#----------------------------------
### structures to generate basis waveforms
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment