Skip to content
Snippets Groups Projects
Commit 8cbb9c71 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_extractor + fxtools/utils.py: add option for switching between...

gstlal_feature_extractor + fxtools/utils.py: add option for switching between etg and timeseries pipeline modes easily, add feature queues and disk writing abilities that handle both independently
parent 6c2eac8a
No related branches found
No related tags found
No related merge requests found
......@@ -245,7 +245,12 @@ class MultiChannelHandler(simplehandler.Handler):
self.feature_start_time = options.feature_start_time
self.feature_end_time = options.feature_end_time
self.columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.feature_queue = utils.FeatureQueue(self.keys, self.columns, self.sample_rate)
### set up queue to cache features depending on pipeline mode
if options.feature_mode == 'timeseries':
self.feature_queue = utils.TimeseriesFeatureQueue(self.keys, self.columns, sample_rate = self.sample_rate)
elif options.feature_mode == 'etg':
self.feature_queue = utils.ETGFeatureQueue(self.keys, self.columns)
# set whether data source is live
self.is_live = data_source_info.data_source in data_source_info.live_sources
......@@ -258,7 +263,12 @@ class MultiChannelHandler(simplehandler.Handler):
# feature saving properties
if options.save_format == 'hdf5':
self.fdata = utils.HDF5FeatureData(self.columns, keys = self.keys, cadence = self.cadence, sample_rate = self.sample_rate)
if options.feature_mode == 'timeseries':
self.fdata = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.cadence, sample_rate = self.sample_rate)
elif options.feature_mode == 'etg':
self.fdata = utils.HDF5ETGFeatureData(self.columns, keys = self.keys, cadence = self.cadence)
else:
raise KeyError, 'not a valid feature mode option'
elif options.save_format == 'ascii':
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel")
......@@ -656,6 +666,7 @@ def parse_command_line():
group.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
group.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Set the filename description in which to save the output.")
group.add_option("--save-format", metavar = "string", default = "hdf5", help = "Specifies the save format (ascii/hdf5/kafka/bottle) of features written to disk. Default = hdf5")
group.add_option("--feature-mode", metavar = "string", default = "timeseries", help = "Specifies the mode for which features are generated (timeseries/etg). Default = timeseries")
group.add_option("--data-transfer", metavar = "string", default = "table", help = "Specifies the format of features transferred over-the-wire (table/row). Default = table")
group.add_option("--cadence", type = "int", default = 20, help = "Rate at which to write trigger files to disk. Default = 20 seconds.")
group.add_option("--persist-cadence", type = "int", default = 200, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Needs to be a multiple of save cadence. Default = 200 seconds.")
......@@ -674,6 +685,7 @@ def parse_command_line():
group.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
group.add_option("--nxydump-segment", metavar = "start:stop", help = "Set the time interval to dump from nxydump elements (optional).")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--snr-threshold", type = "float", default = 5.5, help = "Specifies the SNR threshold for features written to disk, required if 'feature-mode' option is set. Default = 5.5")
group.add_option("--feature-start-time", type = "int", metavar = "seconds", help = "Set the start time of the segment to output features in GPS seconds. Required unless --data-source=lvshm")
group.add_option("--feature-end-time", type = "int", metavar = "seconds", help = "Set the end time of the segment to output features in GPS seconds. Required unless --data-source=lvshm")
parser.add_option_group(group)
......@@ -920,7 +932,10 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, tee), "snrtimeseries_%s_%s.txt" % (channel, repr(rate)), segment = options.nxydump_segment)
# extract features from time series
thishead = pipeparts.mktrigger(pipeline, tee, int(rate // options.sample_rate), max_snr = True)
if options.feature_mode == 'timeseries':
thishead = pipeparts.mktrigger(pipeline, tee, int(rate // options.sample_rate), max_snr = True)
elif options.feature_mode == 'etg':
thishead = pipeparts.mktrigger(pipeline, tee, rate, snr_thresh = options.snr_threshold)
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('aftertrigger', 5, channel, rate))
......
......@@ -211,12 +211,12 @@ class FeatureData(object):
def clear(self):
raise NotImplementedError
class HDF5FeatureData(FeatureData):
class HDF5TimeseriesFeatureData(FeatureData):
"""
Saves feature data to hdf5.
Saves feature data to hdf5 as regularly sampled timeseries.
"""
def __init__(self, columns, keys, **kwargs):
super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
super(HDF5TimeseriesFeatureData, self).__init__(columns, keys = keys, **kwargs)
self.cadence = kwargs.pop('cadence')
self.sample_rate = kwargs.pop('sample_rate')
self.dtype = [(column, '<f8') for column in self.columns]
......@@ -250,15 +250,49 @@ class HDF5FeatureData(FeatureData):
for key in self.keys:
self.feature_data[key][:] = numpy.nan
class FeatureQueue(object):
class HDF5ETGFeatureData(FeatureData):
"""!
Saves feature data with varying dataset lengths (when run in ETG mode) to hdf5.
"""
Class for storing feature data.
def __init__(self, columns, keys, **kwargs):
super(HDF5ETGFeatureData, self).__init__(columns, keys = keys, **kwargs)
self.cadence = kwargs.pop('cadence')
self.dtype = [(column, '<f8') for column in self.columns]
self.feature_data = {key: [] for key in keys}
self.clear()
def dump(self, path, base, start_time, tmp = False):
"""
Saves the current cadence of gps triggers to disk and clear out data
"""
name = "%d_%d" % (start_time, self.cadence)
for key in self.keys:
create_new_dataset(path, base, numpy.array(self.feature_data[key], dtype=self.dtype), name=name, group=key, tmp=tmp)
self.clear()
def append(self, timestamp, features):
"""
Append a trigger row to data structure
NOTE: timestamp arg is here purely to match API, not used in append
"""
for key in features.keys():
for row in features[key]:
self.feature_data[key].append(tuple(row[col] for col in self.columns))
def clear(self):
for key in self.keys:
self.feature_data[key] = []
class TimeseriesFeatureQueue(object):
"""
Class for storing regularly sampled feature data.
NOTE: assumes that ingested features are time ordered.
"""
def __init__(self, channels, columns, sample_rate):
def __init__(self, channels, columns, **kwargs):
self.channels = channels
self.columns = columns
self.sample_rate = sample_rate
self.sample_rate = kwargs.pop('sample_rate')
self.out_queue = deque(maxlen = 5)
self.in_queue = {}
self.counter = Counter()
......@@ -303,6 +337,53 @@ class FeatureQueue(object):
def __len__(self):
return len(self.out_queue)
class ETGFeatureQueue(object):
"""
Class for storing feature data when pipeline is running in ETG mode, i.e. report all triggers above an SNR threshold.
NOTE: assumes that ingested features are time ordered.
"""
def __init__(self, channels, columns, **kwargs):
self.channels = channels
self.columns = columns
self.out_queue = deque(maxlen = 5)
self.in_queue = {}
self.counter = Counter()
self.last_timestamp = 0
self.effective_latency = 2
def append(self, timestamp, channel, row):
if timestamp > self.last_timestamp:
### create new buffer if one isn't available for new timestamp
if timestamp not in self.in_queue:
self.in_queue[timestamp] = self._create_buffer()
self.counter[timestamp] += 1
### store row
self.in_queue[timestamp][channel].append(row)
### check if there's enough new samples that the oldest sample needs to be pushed
if len(self.counter) > self.effective_latency:
oldest_timestamp = min(self.counter.keys())
self.last_timestamp = oldest_timestamp
self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
del self.counter[oldest_timestamp]
def pop(self):
if len(self):
return self.out_queue.popleft()
def flush(self):
while self.in_queue:
oldest_timestamp = min(self.counter.keys())
del self.counter[oldest_timestamp]
self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
def _create_buffer(self):
return {channel: [] for channel in self.channels}
def __len__(self):
return len(self.out_queue)
#----------------------------------
### structures to generate basis waveforms
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment