Skip to content
Snippets Groups Projects
Commit 6e176b07 authored by Patrick Godwin's avatar Patrick Godwin Committed by Kipp Cannon
Browse files

feature_extractor.py: allow buffer in queue to be smaller than 1 sec to reduce...

feature_extractor.py: allow buffer in queue to be smaller than 1 sec to reduce overall latency, fix utils.floor_div to allow non-integer floor division
parent 9d73a2c0
No related branches found
No related tags found
No related merge requests found
......@@ -115,13 +115,6 @@ class MultiChannelHandler(simplehandler.Handler):
self.feature_end_time = options.feature_end_time
self.columns = ['trigger_time', 'frequency', 'q', 'snr', 'phase']
### set up queue to cache features depending on pipeline mode
self.feature_mode = options.feature_mode
if self.feature_mode == 'timeseries':
self.feature_queue = utils.TimeseriesFeatureQueue(self.keys, self.columns, sample_rate = self.sample_rate)
elif self.feature_mode == 'etg':
self.feature_queue = utils.ETGFeatureQueue(self.keys, self.columns)
# set whether data source is live
self.is_live = data_source_info.data_source in data_source_info.live_sources
......@@ -131,8 +124,22 @@ class MultiChannelHandler(simplehandler.Handler):
else:
self.tmp_dir = os.environ['TMPDIR']
# feature saving properties
### feature saving properties
self.save_format = options.save_format
# set queue buffer size based on file format
if self.save_format == 'hdf5':
self.buffer_size = 1 ### 1 second buffers for file-based formats
else:
self.buffer_size = 1. / self.sample_rate
# set up queue to cache features depending on pipeline mode
self.feature_mode = options.feature_mode
if self.feature_mode == 'timeseries':
self.feature_queue = utils.TimeseriesFeatureQueue(self.keys, self.columns, sample_rate = self.sample_rate, buffer_size = self.buffer_size)
elif self.feature_mode == 'etg':
self.feature_queue = utils.ETGFeatureQueue(self.keys, self.columns)
if self.save_format == 'hdf5':
if self.feature_mode == 'timeseries':
self.fdata = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.cadence, sample_rate = self.sample_rate, waveform = self.waveform_type)
......@@ -279,7 +286,7 @@ class MultiChannelHandler(simplehandler.Handler):
trigger_time = row.end_time + row.end_time_ns * 1e-9
# append row for data transfer/saving
timestamp = int(numpy.floor(trigger_time))
timestamp = utils.floor_div(trigger_time, self.buffer_size)
feature_row = {'channel':channel, 'snr':row.snr, 'trigger_time':trigger_time, 'frequency':waveform['frequency'], 'q':waveform['q'], 'phase':row.phase}
self.feature_queue.append(timestamp, channel, feature_row)
......
......@@ -118,7 +118,7 @@ def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch):
def floor_div(x, n):
"""
Floor an integer by removing its remainder
from integer division by another integer n.
from the nearest value n.
>>> floor_div(163, 10)
160
......@@ -127,7 +127,7 @@ def floor_div(x, n):
"""
assert n > 0
return (x / n) * n
return (x // n) * n
def gps2latency(gps_time):
"""
......@@ -341,11 +341,12 @@ class TimeseriesFeatureQueue(object):
self.channels = channels
self.columns = columns
self.sample_rate = kwargs.pop('sample_rate')
self.buffer_size = kwargs.pop('buffer_size')
self.out_queue = deque(maxlen = 5)
self.in_queue = {}
self.counter = Counter()
self.last_timestamp = 0
self.effective_latency = 2
self.effective_latency = 2 # NOTE: set so that late features are not dropped
def append(self, timestamp, channel, row):
if timestamp > self.last_timestamp:
......@@ -377,10 +378,10 @@ class TimeseriesFeatureQueue(object):
self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
def _create_buffer(self):
return defaultdict(lambda: [None for ii in range(self.sample_rate)])
return defaultdict(lambda: [None for ii in range(int(self.sample_rate * self.buffer_size))])
def _idx(self, timestamp):
return int(numpy.floor((timestamp % 1) * self.sample_rate))
return int(numpy.floor(((timestamp / self.buffer_size) % 1) * self.buffer_size *self.sample_rate))
def __len__(self):
return len(self.out_queue)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment