diff --git a/gstlal-ugly/bin/gstlal_etg b/gstlal-ugly/bin/gstlal_etg index 75b5bafe68d452f3a1ef57cc2eabb5e332f826ad..adf9c2134cdcac30a14bde2bf2de55fedc6c5aea 100755 --- a/gstlal-ugly/bin/gstlal_etg +++ b/gstlal-ugly/bin/gstlal_etg @@ -167,16 +167,16 @@ class MultiChannelHandler(simplehandler.Handler): self.keys = kwargs.pop("keys") # iDQ saving properties - self.last_save_time = None self.cadence = options.cadence self.tag = '%s-%s' % (self.instrument[:1], self.description) # hdf saving properties if options.save_hdf: + self.last_save_time = {key:None for key in self.keys} columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq'] self.fdata = idq_aggregator.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence) # ascii saving properties else: - self.tag = '%s-%s' % (self.instrument[:1], self.description) + self.last_save_time = None # create header for trigger file if options.latency: self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency") @@ -262,20 +262,29 @@ class MultiChannelHandler(simplehandler.Handler): self.etg_event_time = buftime # set save times appropriately - if self.last_save_time is None: - self.last_save_time = buftime + if options.save_hdf: + if self.last_save_time[(channel, rate)] is None: + self.last_save_time[(channel, rate)] = buftime + else: + if self.last_save_time is None: + self.last_save_time = buftime # Save triggers (in ascii format) once every cadence - if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)): - if options.save_hdf: - fname = '%s-%d-%d' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence) + if options.save_hdf: + if idq_aggregator.in_new_epoch(buftime, self.last_save_time[(channel, rate)], self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)): + if options.gps_end_time: + fname = '%s-%s-%s' % (self.tag, options.gps_start_time, options.gps_end_time) + else: + fname = '%s-%d' % (self.tag, self.init_gps_time) fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5]) - self.fdata.dump(fpath, fname) - else: + self.fdata.dump(fpath, fname, idq_aggregator.floor_div(self.last_save_time[(channel, rate)], self.cadence), key=(channel, rate)) + self.last_save_time[(channel, rate)] = buftime + else: + if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)): self.to_trigger_file(buftime) self.fdata.clear() self.fdata.append(self.header) - self.last_save_time = buftime + self.last_save_time = buftime # read buffer contents for i in range(buf.n_memory()): diff --git a/gstlal-ugly/python/idq_aggregator.py b/gstlal-ugly/python/idq_aggregator.py index e3819f5e170bcc1861fcb1ceddb761c6792bb43e..f7dc5d1e6cab9e70f623b83296209a144bd6904f 100644 --- a/gstlal-ugly/python/idq_aggregator.py +++ b/gstlal-ugly/python/idq_aggregator.py @@ -42,48 +42,50 @@ from gstlal import aggregator # #################### -def get_dataset(path, base): +def get_dataset(path, base, name = 'data', group = None): """! open a dataset at @param path with name @param base and return the data """ - fname = os.path.join(path, "%s.hdf5" % base) + fname = os.path.join(path, "%s.h5" % base) try: - f = h5py.File(fname, "r") - fields = f.keys() - data = zip(*[f[field] for field in fields]) - f.close() - d_types = [(str(field), 'f8') for field in fields] - data = numpy.array(data, dtype=d_types) + with h5py.File(fname, 'r') as hfile: + if group: + data = numpy.array(hfile[group][name]) + else: + data = numpy.array(hfile[name]) return fname, data except IOError: return fname, [] -def create_new_dataset(path, base, fields, data = None, tmp = False): +def create_new_dataset(path, base, data, name = 'data', group = None, tmp = False): """! A function to create a new dataset with data @param data. The data will be stored in an hdf5 file at path @param path with base name @param base. You can also make a temporary file. """ if tmp: - fname = os.path.join(path, "%s.hdf5.tmp" % base) + fname = os.path.join(path, "%s.h5.tmp" % base) else: - # A non temp dataset should not be overwritten - fname = os.path.join(path, "%s.hdf5" % base) - if os.path.exists(fname): - return fname + fname = os.path.join(path, "%s.h5" % base) + # create dir if it doesn't exist if not os.path.exists(path): aggregator.makedir(path) + + print >>sys.stderr, "filename = %s" %fname + print >>sys.stderr, "dataset name = %s" % name + print >>sys.stderr, "group = %s" % group + print >>sys.stderr, "data = %s" %repr(data) + # save data to hdf5 - f = h5py.File(fname, "w") - for field in fields: - if data is None: - f.create_dataset(field, (0,), dtype="f8") + with h5py.File(fname, 'a') as hfile: + if group: + if group not in hfile: + hfile.create_group(group) + hfile[group].create_dataset(name, data=data) else: - f.create_dataset(field, (len(data[field]),), dtype="f8") - f[field][...] = data[field] + hfile.create_dataset(name, data=data) - f.close() return fname def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch): @@ -142,17 +144,25 @@ class HDF5FeatureData(FeatureData): """ def __init__(self, columns, keys, **kwargs): super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs) - self.cadence = kwargs.pop("cadence") + self.cadence = kwargs.pop('cadence') self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys} - for key in keys: - self.etg_data[key][:] = numpy.nan - - def dump(self, path, base): - for key in self.keys: - key_path = os.path.join(path, str(key[0]), str(key[1]).zfill(4)) - create_new_dataset(key_path, base, self.columns, data = self.etg_data[key]) self.clear() + def dump(self, path, base, start_time, key = None): + """ + Saves the current cadence of gps triggers to disk and clear out data + """ + name = "%d_%d" % (start_time, self.cadence) + if key: + group = os.path.join(str(key[0]), str(key[1]).zfill(4)) + create_new_dataset(path, base, self.etg_data[key], name=name, group=group) + self.clear(key) + else: + for key in self.keys: + group = os.path.join(str(key[0]), str(key[1]).zfill(4)) + create_new_dataset(path, base, self.etg_data[key], name=name, group=group) + self.clear() + def load(self, path): raise NotImplementedError @@ -160,10 +170,16 @@ class HDF5FeatureData(FeatureData): raise NotImplementedError def append(self, value, key = None, buftime = None): + """ + Append a trigger row to data structure + """ if buftime and key: idx = buftime % self.cadence self.etg_data[key][idx] = numpy.array([value[column] for column in self.columns]) - def clear(self): - for key in self.keys: + def clear(self, key = None): + if key: self.etg_data[key][:] = numpy.nan + else: + for key in self.keys: + self.etg_data[key][:] = numpy.nan