Skip to content
Snippets Groups Projects
Commit e7e152d6 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg + idq_aggregator.py: changed hdf5 file IO to write to a single file

parent e4a77e89
No related branches found
No related tags found
No related merge requests found
......@@ -167,16 +167,16 @@ class MultiChannelHandler(simplehandler.Handler):
self.keys = kwargs.pop("keys")
# iDQ saving properties
self.last_save_time = None
self.cadence = options.cadence
self.tag = '%s-%s' % (self.instrument[:1], self.description)
# hdf saving properties
if options.save_hdf:
self.last_save_time = {key:None for key in self.keys}
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.fdata = idq_aggregator.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
# ascii saving properties
else:
self.tag = '%s-%s' % (self.instrument[:1], self.description)
self.last_save_time = None
# create header for trigger file
if options.latency:
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency")
......@@ -262,20 +262,29 @@ class MultiChannelHandler(simplehandler.Handler):
self.etg_event_time = buftime
# set save times appropriately
if self.last_save_time is None:
self.last_save_time = buftime
if options.save_hdf:
if self.last_save_time[(channel, rate)] is None:
self.last_save_time[(channel, rate)] = buftime
else:
if self.last_save_time is None:
self.last_save_time = buftime
# Save triggers (in ascii format) once every cadence
if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
if options.save_hdf:
fname = '%s-%d-%d' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence)
if options.save_hdf:
if idq_aggregator.in_new_epoch(buftime, self.last_save_time[(channel, rate)], self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
if options.gps_end_time:
fname = '%s-%s-%s' % (self.tag, options.gps_start_time, options.gps_end_time)
else:
fname = '%s-%d' % (self.tag, self.init_gps_time)
fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5])
self.fdata.dump(fpath, fname)
else:
self.fdata.dump(fpath, fname, idq_aggregator.floor_div(self.last_save_time[(channel, rate)], self.cadence), key=(channel, rate))
self.last_save_time[(channel, rate)] = buftime
else:
if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
self.to_trigger_file(buftime)
self.fdata.clear()
self.fdata.append(self.header)
self.last_save_time = buftime
self.last_save_time = buftime
# read buffer contents
for i in range(buf.n_memory()):
......
......@@ -42,48 +42,50 @@ from gstlal import aggregator
#
####################
def get_dataset(path, base):
def get_dataset(path, base, name = 'data', group = None):
"""!
open a dataset at @param path with name @param base and return the data
"""
fname = os.path.join(path, "%s.hdf5" % base)
fname = os.path.join(path, "%s.h5" % base)
try:
f = h5py.File(fname, "r")
fields = f.keys()
data = zip(*[f[field] for field in fields])
f.close()
d_types = [(str(field), 'f8') for field in fields]
data = numpy.array(data, dtype=d_types)
with h5py.File(fname, 'r') as hfile:
if group:
data = numpy.array(hfile[group][name])
else:
data = numpy.array(hfile[name])
return fname, data
except IOError:
return fname, []
def create_new_dataset(path, base, fields, data = None, tmp = False):
def create_new_dataset(path, base, data, name = 'data', group = None, tmp = False):
"""!
A function to create a new dataset with data @param data.
The data will be stored in an hdf5 file at path @param path with
base name @param base. You can also make a temporary file.
"""
if tmp:
fname = os.path.join(path, "%s.hdf5.tmp" % base)
fname = os.path.join(path, "%s.h5.tmp" % base)
else:
# A non temp dataset should not be overwritten
fname = os.path.join(path, "%s.hdf5" % base)
if os.path.exists(fname):
return fname
fname = os.path.join(path, "%s.h5" % base)
# create dir if it doesn't exist
if not os.path.exists(path):
aggregator.makedir(path)
print >>sys.stderr, "filename = %s" %fname
print >>sys.stderr, "dataset name = %s" % name
print >>sys.stderr, "group = %s" % group
print >>sys.stderr, "data = %s" %repr(data)
# save data to hdf5
f = h5py.File(fname, "w")
for field in fields:
if data is None:
f.create_dataset(field, (0,), dtype="f8")
with h5py.File(fname, 'a') as hfile:
if group:
if group not in hfile:
hfile.create_group(group)
hfile[group].create_dataset(name, data=data)
else:
f.create_dataset(field, (len(data[field]),), dtype="f8")
f[field][...] = data[field]
hfile.create_dataset(name, data=data)
f.close()
return fname
def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch):
......@@ -142,17 +144,25 @@ class HDF5FeatureData(FeatureData):
"""
def __init__(self, columns, keys, **kwargs):
super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
self.cadence = kwargs.pop("cadence")
self.cadence = kwargs.pop('cadence')
self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys}
for key in keys:
self.etg_data[key][:] = numpy.nan
def dump(self, path, base):
for key in self.keys:
key_path = os.path.join(path, str(key[0]), str(key[1]).zfill(4))
create_new_dataset(key_path, base, self.columns, data = self.etg_data[key])
self.clear()
def dump(self, path, base, start_time, key = None):
"""
Saves the current cadence of gps triggers to disk and clear out data
"""
name = "%d_%d" % (start_time, self.cadence)
if key:
group = os.path.join(str(key[0]), str(key[1]).zfill(4))
create_new_dataset(path, base, self.etg_data[key], name=name, group=group)
self.clear(key)
else:
for key in self.keys:
group = os.path.join(str(key[0]), str(key[1]).zfill(4))
create_new_dataset(path, base, self.etg_data[key], name=name, group=group)
self.clear()
def load(self, path):
raise NotImplementedError
......@@ -160,10 +170,16 @@ class HDF5FeatureData(FeatureData):
raise NotImplementedError
def append(self, value, key = None, buftime = None):
"""
Append a trigger row to data structure
"""
if buftime and key:
idx = buftime % self.cadence
self.etg_data[key][idx] = numpy.array([value[column] for column in self.columns])
def clear(self):
for key in self.keys:
def clear(self, key = None):
if key:
self.etg_data[key][:] = numpy.nan
else:
for key in self.keys:
self.etg_data[key][:] = numpy.nan
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment