Skip to content
Snippets Groups Projects
Commit bf417568 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

feature extraction: change trigger_time -> time, remove nan rows, specify true...

feature extraction: change trigger_time -> time, remove nan rows, specify true spans of datasets to comply with gwtrigfind
parent 423428ae
No related branches found
No related tags found
No related merge requests found
......@@ -125,8 +125,14 @@ class HDF5StreamSink(object):
self.persist_cadence = options.persist_cadence
self.waveform = options.waveform
self.basename = '%s-%s' % (options.instrument[:1], options.basename)
self.columns = ['trigger_time', 'frequency', 'q', 'snr', 'phase']
self.feature_data = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.write_cadence, sample_rate = self.sample_rate, waveform = self.waveform)
self.columns = ['time', 'frequency', 'q', 'snr', 'phase', 'duration']
self.feature_data = utils.HDF5TimeseriesFeatureData(
self.columns,
keys = self.keys,
cadence = self.write_cadence,
sample_rate = self.sample_rate,
waveform = self.waveform
)
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
......@@ -188,13 +194,14 @@ class HDF5StreamSink(object):
if self.last_save_time is None:
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp + 1
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence):
logger.info("saving features to disk at timestamp = %f" % self.timestamp)
self.feature_data.dump(self.tmp_path, self.feature_name, utils.floor_div(self.last_save_time, self.write_cadence), tmp = True)
save_time = utils.floor_div(self.last_save_time, self.write_cadence)
self.feature_data.dump(self.tmp_path, self.feature_name, save_time, tmp = True)
self.last_save_time = self.timestamp
# persist triggers once per persist cadence if using hdf5 format
......
......@@ -113,7 +113,7 @@ class MultiChannelHandler(simplehandler.Handler):
self.persist_cadence = options.persist_cadence
self.feature_start_time = options.feature_start_time
self.feature_end_time = options.feature_end_time
self.columns = ['trigger_time', 'frequency', 'q', 'snr', 'phase']
self.columns = ['timestamp', 'time', 'snr', 'phase', 'frequency', 'q', 'duration']
# set whether data source is live
self.is_live = data_source_info.data_source in data_source_info.live_sources
......@@ -129,22 +129,39 @@ class MultiChannelHandler(simplehandler.Handler):
# set queue buffer size based on file format
if self.save_format == 'hdf5':
self.buffer_size = 1 ### 1 second buffers for file-based formats
self.buffer_size = 1. ### 1 second buffers for file-based formats
else:
self.buffer_size = 1. / self.sample_rate
# set up queue to cache features depending on pipeline mode
self.feature_mode = options.feature_mode
if self.feature_mode == 'timeseries':
self.feature_queue = utils.TimeseriesFeatureQueue(self.keys, self.columns, sample_rate = self.sample_rate, buffer_size = self.buffer_size)
self.feature_queue = utils.TimeseriesFeatureQueue(
self.keys,
self.columns,
sample_rate = self.sample_rate,
buffer_size = self.buffer_size
)
elif self.feature_mode == 'etg':
self.feature_queue = utils.ETGFeatureQueue(self.keys, self.columns)
# set up structure to store feature data
if self.save_format == 'hdf5':
if self.feature_mode == 'timeseries':
self.fdata = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.cadence, sample_rate = self.sample_rate, waveform = self.waveform_type)
self.fdata = utils.HDF5TimeseriesFeatureData(
self.columns,
keys = self.keys,
cadence = self.cadence,
sample_rate = self.sample_rate,
waveform = self.waveform_type
)
elif self.feature_mode == 'etg':
self.fdata = utils.HDF5ETGFeatureData(self.columns, keys = self.keys, cadence = self.cadence, waveform = self.waveform_type)
self.fdata = utils.HDF5ETGFeatureData(
self.columns,
keys = self.keys,
cadence = self.cadence,
waveform = self.waveform_type
)
else:
raise KeyError, 'not a valid feature mode option'
......@@ -216,13 +233,13 @@ class MultiChannelHandler(simplehandler.Handler):
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
if self.save_format =='hdf5':
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp + 1
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if self.save_format == 'hdf5':
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.cadence) or (self.timestamp == self.feature_end_time):
self.logger.info("saving features to disk at timestamp = %d, latency = %.3f" % (self.timestamp, utils.gps2latency(self.timestamp)))
self.logger.info("saving features to disk at timestamp = %d" % self.timestamp)
self.save_features()
self.last_save_time = self.timestamp
......@@ -237,12 +254,13 @@ class MultiChannelHandler(simplehandler.Handler):
# add features to respective format specified
if self.save_format == 'kafka':
if self.data_transfer == 'table':
self.logger.info("pushing features to disk at timestamp = %.3f, latency = %.3f" % (self.timestamp, utils.gps2latency(self.timestamp)))
self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(feature_subset))
elif self.data_transfer == 'row':
for row in itertools.chain(*feature_subset['features'].values()):
if row:
self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(row))
self.logger.info("pushing features to disk at timestamp = %.3f, latency = %.3f" % (self.timestamp, utils.gps2latency(self.timestamp)))
self.producer.poll(0) ### flush out queue of sent packets
elif self.save_format == 'bottle':
self.feature_data.append(feature_subset)
......@@ -287,8 +305,17 @@ class MultiChannelHandler(simplehandler.Handler):
trigger_time = row.end_time + row.end_time_ns * 1e-9
# append row for data transfer/saving
feature_row = {
'timestamp': utils.floor_div(buftime, 1. / self.sample_rate),
'channel': channel,
'snr': row.snr,
'phase': row.phase,
'time': trigger_time,
'frequency': waveform['frequency'],
'q': waveform['q'],
'duration': waveform['duration'],
}
timestamp = utils.floor_div(buftime, self.buffer_size)
feature_row = {'channel':channel, 'snr':row.snr, 'trigger_time':trigger_time, 'frequency':waveform['frequency'], 'q':waveform['q'], 'phase':row.phase}
self.feature_queue.append(timestamp, channel, feature_row)
def save_features(self):
......
......@@ -27,7 +27,9 @@
from collections import Counter, defaultdict, deque
import glob
import itertools
import logging
import operator
import os
import sys
import timeit
......@@ -218,6 +220,17 @@ def gen_formatter():
"""
return logging.Formatter('%(asctime)s | %(name)s : %(levelname)s : %(message)s')
#----------------------------------
### other utilities
def group_indices(indices):
"""
Given a list of indices, groups up indices into contiguous groups.
"""
for k, group in itertools.groupby(enumerate(indices), lambda (i,x):i-x):
yield map(operator.itemgetter(1), group)
####################
#
# classes
......@@ -265,9 +278,18 @@ class HDF5TimeseriesFeatureData(FeatureData):
"""
Saves the current cadence of features to disk and clear out data
"""
name = "%d_%d" % (start_time, self.cadence)
for key in self.keys:
create_new_dataset(path, base, self.feature_data[key], name=name, group=key, tmp=tmp, metadata=self.metadata)
nonnan_indices = list(numpy.where(numpy.isfinite(self.feature_data[key]['time']))[0])
### split up and save datasets into contiguous segments
for idx_group in group_indices(nonnan_indices):
start_idx, end_idx = idx_group[0], idx_group[-1]
start = start_time + float(start_idx) / self.sample_rate
end = start_time + float(end_idx + 1) / self.sample_rate
name = "%.6f_%.6f" % (float(start), float(end - start))
create_new_dataset(path, base, self.feature_data[key][start_idx:end_idx], name=name, group=key, tmp=tmp, metadata=self.metadata)
### clear out current features
self.clear()
def append(self, timestamp, features):
......@@ -330,19 +352,19 @@ class TimeseriesFeatureQueue(object):
Example:
>>> # create the queue
>>> columns = ['trigger_time', 'snr']
>>> columns = ['time', 'snr']
>>> channels = ['channel1']
>>> queue = TimeseriesFeatureQueue(channels, columns, sample_rate=1, buffer_size=1)
>>> # add features
>>> queue.append(123450, 'channel1', {'trigger_time': 123450.3, 'snr': 3.0})
>>> queue.append(123451, 'channel1', {'trigger_time': 123451.7, 'snr': 6.5})
>>> queue.append(123452, 'channel1', {'trigger_time': 123452.4, 'snr': 5.2})
>>> queue.append(123450, 'channel1', {'time': 123450.3, 'snr': 3.0})
>>> queue.append(123451, 'channel1', {'time': 123451.7, 'snr': 6.5})
>>> queue.append(123452, 'channel1', {'time': 123452.4, 'snr': 5.2})
>>> # get oldest feature
>>> row = queue.pop()
>>> row['timestamp']
123450
>>> row['features']['channel1']
[{'snr': 3.0, 'trigger_time': 123450.3}]
[{'snr': 3.0, 'time': 123450.3}]
"""
def __init__(self, channels, columns, **kwargs):
......@@ -364,7 +386,7 @@ class TimeseriesFeatureQueue(object):
self.counter[timestamp] += 1
### store row, aggregating if necessary
idx = self._idx(row['trigger_time'])
idx = self._idx(row['time'])
if not self.in_queue[timestamp][channel][idx] or (row['snr'] > self.in_queue[timestamp][channel][idx]['snr']):
self.in_queue[timestamp][channel][idx] = row
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment