diff --git a/gstlal-ugly/bin/gstlal_feature_extractor b/gstlal-ugly/bin/gstlal_feature_extractor index 4c0c9e1f81b04d72990b70850da7f36f2b099024..845187f21115284a1e34431d279acb617a9a83c4 100755 --- a/gstlal-ugly/bin/gstlal_feature_extractor +++ b/gstlal-ugly/bin/gstlal_feature_extractor @@ -217,12 +217,14 @@ class MultiChannelHandler(simplehandler.Handler): self.instrument = data_source_info.instrument self.frame_segments = data_source_info.frame_segments self.keys = kwargs.pop("keys") + self.num_samples = len(self.keys) + self.sample_rate = 1 # NOTE: hard-coded for now self.waveforms = kwargs.pop("waveforms") self.basename = kwargs.pop("basename") self.waveform_type = options.waveform # format keys used for saving, etc. - self.aggregate_rate = True + self.aggregate_rate = True # NOTE: hard-coded for now if self.aggregate_rate: self.keys = list(set([key[0] for key in self.keys])) else: @@ -251,17 +253,11 @@ class MultiChannelHandler(simplehandler.Handler): # row properties columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq'] - self.Row = namedtuple('Row', columns) # feature saving properties if options.save_format == 'hdf5': self.fdata = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence) - - if self.is_live: - duration = idq_utils.floor_div(self.feature_start_time + self.persist_cadence, self.persist_cadence) - self.feature_start_time - else: - duration = self.feature_end_time - self.feature_start_time - + duration = idq_utils.floor_div(self.feature_start_time + self.persist_cadence, self.persist_cadence) - self.feature_start_time self.set_hdf_file_properties(self.feature_start_time, duration) elif options.save_format == 'ascii': @@ -271,8 +267,7 @@ class MultiChannelHandler(simplehandler.Handler): self.fdata.append(self.header) # set up stream related properties - self.stream_event = deque(maxlen = 20000) - self.last_stream_event_time = None + self.stream_event = idq_utils.FeatureQueue(self.keys, columns, self.sample_rate, self.num_samples) # set up bottle routes for PSDs and extracted feature data self.psds = {} @@ -329,16 +324,12 @@ class MultiChannelHandler(simplehandler.Handler): channel, rate = sink_dict[elem] # push new stream event to queue if done processing current timestamp - if self.last_stream_event_time is None: - self.last_stream_event_time = buftime - if self.last_stream_event_time < buftime: - feature_subset = {'timestamp': self.last_stream_event_time, 'feature_data': list(self.stream_event)} + if len(self.stream_event): + feature_subset = self.stream_event.pop() if options.use_kafka: - self.producer.produce(timestamp = self.last_stream_event_time, topic = self.kafka_topic, value = json.dumps(feature_subset)) + self.producer.produce(timestamp = feature_subset['timestamp'], topic = self.kafka_topic, value = json.dumps(feature_subset)) else: self.feature_data.append(feature_subset) - self.stream_event.clear() - self.last_stream_event_time = buftime # set save time appropriately if self.last_save_time is None: @@ -356,8 +347,8 @@ class MultiChannelHandler(simplehandler.Handler): self.fdata.append(self.header) self.last_save_time = buftime - # persist triggers once per persist cadence if using hdf5 format and running with live data - if self.is_live and idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5': + # persist triggers once per persist cadence if using hdf5 format + if idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5': logger.info("persisting features to disk at timestamp = %d" % buftime) self.finish_hdf_file() self.last_persist_time = buftime @@ -413,9 +404,10 @@ class MultiChannelHandler(simplehandler.Handler): stop_time = trigger_time # append row for data transfer/saving - feature_row = self.Row(start_time=start_time, stop_time=stop_time, trigger_time=trigger_time, - frequency=freq, q=q, phase=row.phase, sigmasq=row.sigmasq, chisq=row.chisq, snr=row.snr) - self.stream_event.append(feature_row._asdict().update({'timestamp':buftime, 'channel':channel, 'rate':rate})) + timestamp = int(numpy.floor(trigger_time)) + feature_row = {'timestamp':timestamp, 'channel':channel, 'start_time':start_time, 'stop_time':stop_time, 'snr':row.snr, + 'trigger_time':trigger_time, 'frequency':freq, 'q':q, 'phase':row.phase, 'sigmasq':row.sigmasq, 'chisq':row.chisq} + self.stream_event.append(timestamp, channel, feature_row) # save iDQ compatible data if options.save_format == 'hdf5': diff --git a/gstlal-ugly/python/idq_utils.py b/gstlal-ugly/python/idq_utils.py index f283213ddea3b9572b3ca3a1015bc4d73c96340c..fcd7cd8f8979fb3da6900f8669a3c52077f3a6a2 100644 --- a/gstlal-ugly/python/idq_utils.py +++ b/gstlal-ugly/python/idq_utils.py @@ -26,7 +26,7 @@ import bisect -from collections import defaultdict +from collections import Counter, defaultdict, deque import glob import logging import os @@ -217,10 +217,10 @@ class HDF5FeatureData(FeatureData): """ def __init__(self, columns, keys, **kwargs): super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs) - self.latency = 1 + self.padding = 1 self.cadence = kwargs.pop('cadence') self.dtype = [(column, '<f8') for column in self.columns] - self.feature_data = {key: numpy.empty(((self.cadence+self.latency),), dtype = self.dtype) for key in keys} + self.feature_data = {key: numpy.empty(((self.cadence+self.padding),), dtype = self.dtype) for key in keys} self.last_save_time = None self.clear() @@ -243,9 +243,9 @@ class HDF5FeatureData(FeatureData): """ if buftime and key: self.last_save_time = floor_div(buftime, self.cadence) - idx = int(numpy.floor(value.trigger_time)) - self.last_save_time - if numpy.isnan(self.feature_data[key][idx][self.columns[0]]) or (value.snr > self.feature_data[key][idx]['snr']): - self.feature_data[key][idx] = numpy.array(value, dtype=self.dtype) + idx = int(numpy.floor(value['trigger_time'])) - self.last_save_time + if numpy.isnan(self.feature_data[key][idx][self.columns[0]]) or (value['snr'] > self.feature_data[key][idx]['snr']): + self.feature_data[key][idx] = numpy.array(tuple(value[col] for col in self.columns), dtype=self.dtype) def clear(self, key = None): if key: @@ -283,7 +283,7 @@ class HDF5SeriesFeatureData(FeatureData): Append a trigger row to data structure """ if buftime and key: - self.feature_data[key].append(value) + self.feature_data[key].append(tuple(value[col] for col in self.columns)) def clear(self, key = None): if key: @@ -292,6 +292,54 @@ class HDF5SeriesFeatureData(FeatureData): for key in self.keys: self.feature_data[key] = [] +class FeatureQueue(object): + """ + Class for storing feature data. + NOTE: assumes that ingested features are time ordered. + """ + def __init__(self, channels, columns, sample_rate, num_samples): + self.channels = channels + self.columns = columns + self.sample_rate = sample_rate + self.num_samples = num_samples + self.out_queue = deque(maxlen = 5) + self.in_queue = {} + self.counter = Counter() + self.last_timestamp = 0 + self.effective_latency = 2 + + def append(self, timestamp, channel, row): + if timestamp > self.last_timestamp: + ### create new buffer if one isn't available for new timestamp + if timestamp not in self.in_queue: + self.in_queue[timestamp] = self._create_buffer() + self.counter[timestamp] += 1 + + ### store row, aggregating if necessary + idx = self._idx(timestamp) + if not self.in_queue[timestamp][channel][idx] or (row['snr'] > self.in_queue[timestamp][channel][idx]['snr']): + self.in_queue[timestamp][channel][idx] = row + + ### check if there's enough new samples that the oldest sample needs to be pushed + if len(self.counter) > self.effective_latency: + oldest_timestamp = min(self.counter.keys()) + self.last_timestamp = oldest_timestamp + self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)}) + del self.counter[oldest_timestamp] + + def pop(self): + if len(self): + return self.out_queue.popleft() + + def _create_buffer(self): + return {channel: [None for x in range(self.sample_rate)] for channel in self.channels} + + def _idx(self, timestamp): + return int(numpy.floor((timestamp % 1) * self.sample_rate)) + + def __len__(self): + return len(self.out_queue) + #---------------------------------- ### structures to generate basis waveforms