Skip to content
Snippets Groups Projects
Commit a52e5ca7 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg: added hdf file saving ala h5py, added utility functions and...

gstlal_etg: added hdf file saving ala h5py, added utility functions and classes to idq_aggregator.py
parent f667d235
No related branches found
No related tags found
No related merge requests found
......@@ -170,13 +170,20 @@ class MultiChannelHandler(simplehandler.Handler):
self.last_save_time = None
self.cadence = options.cadence
self.tag = '%s-%s' % (self.instrument[:1], self.description)
# create header for trigger file
if options.latency:
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency")
# hdf saving properties
if options.save_hdf:
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.fdata = idq_aggregator.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
# ascii saving properties
else:
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel")
self.fdata = deque(maxlen = 25000)
self.fdata.append(self.header)
self.tag = '%s-%s' % (self.instrument[:1], self.description)
# create header for trigger file
if options.latency:
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency")
else:
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel")
self.fdata = deque(maxlen = 25000)
self.fdata.append(self.header)
# set initialization time
if options.data_source in ("framexmit", "lvshm"):
......@@ -260,9 +267,14 @@ class MultiChannelHandler(simplehandler.Handler):
# Save triggers (in ascii format) once every cadence
if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
self.to_trigger_file(buftime)
self.fdata.clear()
self.fdata.append(self.header)
if options.save_hdf:
fname = '%s-%d-%d' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence)
fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5])
self.fdata.dump(fpath, fname)
else:
self.to_trigger_file(buftime)
self.fdata.clear()
self.fdata.append(self.header)
self.last_save_time = buftime
# read buffer contents
......@@ -303,15 +315,18 @@ class MultiChannelHandler(simplehandler.Handler):
# NOTE
# Setting stop time to trigger time for use with half sine gaussians
stop_time = trigger_time
# save iDQ compatible data
if options.latency:
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag, latency))
else:
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag))
# append row to feature vector for bottle requests
etg_row = {'timestamp': buftime, 'channel': channel, 'rate': rate, 'start_time': start_time, 'stop_time': stop_time, 'trigger_time': trigger_time,
'frequency': freq, 'q': q, 'phase': row.phase, 'sigma_sq': row.sigmasq, 'chi_sq': row.chisq, 'snr': row.snr}
'frequency': freq, 'q': q, 'phase': row.phase, 'sigmasq': row.sigmasq, 'chisq': row.chisq, 'snr': row.snr}
self.etg_event.append(etg_row)
# save iDQ compatible data
if options.save_hdf:
self.fdata.append(etg_row, key = (channel, rate), buftime = buftime)
else:
if options.latency:
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag, latency))
else:
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag))
def to_trigger_file(self, buftime = None):
......@@ -481,6 +496,7 @@ def parse_command_line():
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence")
parser.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.")
parser.add_option("--etg-partition", metavar = "string", help = "If using Kafka, sets the partition that this ETG is assigned to.")
parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this ETG publishes feature vector subsets to.")
......@@ -669,7 +685,8 @@ if options.verbose:
mainloop.run()
# save remaining triggers
handler.to_trigger_file()
if not options.save_hdf:
handler.to_trigger_file()
#
# Shut down pipeline
......
......@@ -29,8 +29,8 @@ import os
import glob
import sys
import h5py
import numpy
import pandas
from lal import gpstime
from gstlal import aggregator
......@@ -42,141 +42,59 @@ from gstlal import aggregator
#
####################
def reduce_data(data, gps_end, path, reduce_count, channel, rate, reduce_cadence = 100, aggregate = 'max', level = 0):
"""!
This function does a data reduction recursively as needed
by powers of 10 where level specifies the power.
Default minimum is 1 e.g., reduction by 1 order of magnitude.
"""
if (reduce_count % (10 ** level) == 0 and level < aggregator.DIRS):
gps_start = gps_end - reduce_cadence * (10 ** level)
if level == 0:
agg_data = aggregate_data(data, gps_start, gps_end, aggregate = aggregate)
else:
agg_data = aggregate_data(get_dataset_by_range(gps_start, gps_end, path, channel, rate, aggregate = aggregate, level = level - 1), gps_start, gps_end, level = level)
path = update_agg_path(path, gps_start, cur_level = level - 1, new_level = level)
if agg_data is not None:
create_new_dataset(path, 'aggregates', agg_data, channel, rate, aggregate = aggregate)
reduce_data(data, gps_end, path, reduce_count, channel, rate, aggregate = aggregate, level = level + 1)
def aggregate_data(data, gps_start, gps_end, column = 'snr', aggregate = 'max', level = 0):
"""!
Reduces data of a given level for a given gps range,
column, and aggregate. Returns the aggregated data.
"""
gps_start_idx = floor_div(gps_start, 10 ** (level+1))
gps_end_idx = floor_div(gps_end, 10 ** (level+1))
if aggregate == 'max':
max_index = get_dataframe_subset(gps_start_idx, gps_end_idx, data, level = level).groupby(pandas.TimeGrouper('%ds' % (10 ** (level+1))))[column].idxmax().dropna().values
else:
raise NotImplementedError
if max_index.size > 0:
return data.loc[max_index]
else:
return None
def get_dataset(path, base, channel, rate, aggregate = None):
def get_dataset(path, base):
"""!
open a dataset at @param path with name @param base and return the data
"""
fname = os.path.join(path, "%s.h5" % base)
with pandas.HDFStore(fname) as store:
if aggregate is None:
return store.select('%s/%s/data' % (channel, str(rate)))
else:
return store.select('%s/%s/%s' % (channel, str(rate), aggregate))
def create_new_dataset(path, base, data, channel, rate, aggregate = None, tmp = False):
fname = os.path.join(path, "%s.hdf5" % base)
try:
f = h5py.File(fname, "r")
fields = f.keys()
data = zip(*[f[field] for field in fields])
f.close()
d_types = [(str(field), 'f8') for field in fields]
data = numpy.array(data, dtype=d_types)
return fname, data
except IOError:
return fname, []
def create_new_dataset(path, base, fields, data = None, tmp = False):
"""!
A function to create a new dataset with data @param data.
The data will be stored in an hdf5 file at path @param path with
base name @param base. You can also make a temporary file.
"""
# create path if one doesn't already exist
if not os.path.exists(path):
aggregator.makedir(path)
if tmp:
fname = os.path.join(path, "%s.h5.tmp" % base)
fname = os.path.join(path, "%s.hdf5.tmp" % base)
else:
fname = os.path.join(path, "%s.h5" % base)
with pandas.HDFStore(fname) as store:
if aggregate is None:
store.put('%s/%s/data' % (channel, str(rate)), data)
# A non temp dataset should not be overwritten
fname = os.path.join(path, "%s.hdf5" % base)
if os.path.exists(fname):
return fname
# create dir if it doesn't exist
if not os.path.exists(path):
aggregator.makedir(path)
# save data to hdf5
f = h5py.File(fname, "w")
for field in fields:
if data is None:
f.create_dataset(field, (0,), dtype="f8")
else:
store.append('%s/%s/%s' % (channel, str(rate), aggregate), data)
return fname
f.create_dataset(field, (len(data[field]),), dtype="f8")
f[field][...] = data[field]
def get_dataset_by_range(gps_start, gps_end, path, channel, rate, aggregate = None, level = 0):
"""!
Returns a dataset for a given aggregation level and gps range.
"""
global_start_index = floor_div(gps_start, 10 ** (level+1))
global_end_index = floor_div(gps_end, 10 ** (level+1))
gps_epochs = (floor_div(t, aggregator.MIN_TIME_QUANTA * (10 ** level)) for t in range(global_start_index, global_end_index, aggregator.MIN_TIME_QUANTA * (10 ** level)))
# generate all relevant files and combine to one dataset
# FIXME: more efficient to filter relevant datasets first
# rather than overpopulating dataset and then filtering
paths = (update_agg_path(path, gps_epoch, cur_level = level, new_level = level) for gps_epoch in gps_epochs)
if aggregate is None:
pattern = '[0-9]' * 10 + '.h5'
filelist = (glob.glob(os.path.join(path, pattern)) for path in paths)
# flatten list
files = (file_ for sublist in filelist for file_ in sublist)
else:
files = (os.path.join(path, 'aggregates.h5') for path in paths)
datasets = (pandas.read_hdf(file_, '%s/%s/%s' % (channel, rate, aggregate)) for file_ in files)
return get_dataframe_subset(global_start_index, global_end_index, pandas.concat(datasets), level = level)
def get_dataframe_subset(gps_start, gps_end, dataframe, level = 0):
start_index = gps_to_index(gps_start)
end_index = gps_to_index(gps_end - (10 ** level))
return dataframe.loc[start_index:end_index]
def create_dataframe_index(gps_time):
"""
Returns a datetime index based on gps times
per minimum aggregator quanta.
"""
start_time = gps_to_index(floor_div(gps_time, aggregator.MIN_TIME_QUANTA))
return pandas.date_range(start_time, periods = aggregator.MIN_TIME_QUANTA, freq = 's')
def gps_to_index(gps_time):
# FIXME: round microseconds is a workaround for possible bug in the
# gpstime module in lal that adds a microsecond
return pandas.Timestamp(gpstime.gps_to_utc(gps_time).replace(microsecond=0))
def gps_from_index(timestamp):
return int(gpstime.utc_to_gps(timestamp))
def to_agg_path(base_path, tag, gps_time, level = 0):
"""
Returns a hierarchical gps time path based on
channel rate and level in hierarchy.
e.g. level 0: base/tag/1/2/3/4/5/6/
e.g. level 2: base/tag/1/2/3/4/
"""
path = os.path.join(base_path, tag)
path = os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = level))
return path
def update_agg_path(path, gps_time, cur_level = 0, new_level = 0):
"""
Returns an updated aggregator path based on
an existing path and a gps time.
"""
for agg_level in range(max(0, cur_level), aggregator.DIRS):
path, _ = os.path.split(path)
return os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = new_level))
f.close()
return fname
def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch):
"""
"""!
Returns whether new and old gps times are in different
epochs.
"""
return (new_gps_time - floor_div(prev_gps_time, gps_epoch)) >= gps_epoch
def floor_div(x, n):
"""
"""!
Floor an integer by removing its remainder
from integer division by another integer n.
e.g. floor_div(163, 10) = 160
......@@ -184,3 +102,68 @@ def floor_div(x, n):
"""
assert n > 0
return (x / n) * n
####################
#
# classes
#
####################
class FeatureData(object):
"""!
Base class for saving feature data.
Extend for a specific file-based implementation.
"""
def __init__(self, columns, keys = None, **kwargs):
self.keys = keys
self.columns = columns
self.etg_data = {}
def dump(self, path):
raise NotImplementedError
def load(self, path):
raise NotImplementedError
def pop(self):
raise NotImplementedError
def append(self, key, value):
raise NotImplementedError
def clear(self):
raise NotImplementedError
class HDF5FeatureData(FeatureData):
"""!
Saves feature data to hdf5.
"""
def __init__(self, columns, keys, **kwargs):
super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
self.cadence = kwargs.pop("cadence")
self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys}
for key in keys:
self.etg_data[key][:] = numpy.nan
def dump(self, path, base):
for key in self.keys:
key_path = os.path.join(path, str(key[0]), str(key[1]).zfill(4))
create_new_dataset(key_path, base, self.columns, data = self.etg_data[key])
self.clear()
def load(self, path):
raise NotImplementedError
def pop(self):
raise NotImplementedError
def append(self, value, key = None, buftime = None):
if buftime and key:
idx = buftime % self.cadence
self.etg_data[key][idx] = numpy.array([value[column] for column in self.columns])
def clear(self):
for key in self.keys:
self.etg_data[key][:] = numpy.nan
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment