Skip to content
Snippets Groups Projects
Commit 05e83de4 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg: fixed bug where hdf dataset rewrites caused pipeline to crash,...

gstlal_etg: fixed bug where hdf dataset rewrites caused pipeline to crash, fixed bug where last set of hdf triggers were not being written
parent fce93b2a
No related branches found
No related tags found
No related merge requests found
......@@ -184,14 +184,34 @@ class MultiChannelHandler(simplehandler.Handler):
self.instrument = kwargs.pop("instrument")
self.keys = kwargs.pop("keys")
# iDQ saving properties
# set initialization time
if options.data_source in ("framexmit", "lvshm"):
self.init_gps_time = int(aggregator.now())
else:
self.init_gps_time = int(options.gps_start_time)
### iDQ saving properties
self.cadence = options.cadence
self.tag = '%s-%s' % (self.instrument[:1], self.description)
# hdf saving properties
if options.save_hdf:
self.last_save_time = {key:None for key in self.keys}
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.fdata = idq_aggregator.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
if options.gps_end_time:
duration = int(options.gps_end_time) - int(options.gps_start_time)
self.fname = '%s-%d-%d' % (self.tag, self.init_gps_time, duration)
else:
self.fname = '%s-%d-9999999999' % (self.tag, self.init_gps_time)
self.fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(self.fname.split("-")[2])[:5])
# delete leftover temporary files
temp_path = os.path.join(self.fpath, self.fname)+'.h5.tmp'
if os.path.isfile(temp_path):
os.remove(temp_path)
# ascii saving properties
else:
self.last_save_time = None
......@@ -203,12 +223,6 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata = deque(maxlen = 25000)
self.fdata.append(self.header)
# set initialization time
if options.data_source in ("framexmit", "lvshm"):
self.init_gps_time = int(aggregator.now())
else:
self.init_gps_time = int(options.gps_start_time)
# set up ETG bottle related properties
self.etg_event = deque(maxlen = 20000)
self.etg_event_time = None
......@@ -287,17 +301,10 @@ class MultiChannelHandler(simplehandler.Handler):
if self.last_save_time is None:
self.last_save_time = buftime
# Save triggers (in ascii format) once every cadence
# Save triggers (hdf or ascii) once per cadence
if options.save_hdf:
if idq_aggregator.in_new_epoch(buftime, self.last_save_time[(channel, rate)], self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
if options.gps_end_time:
# Uses the T050017 filenaming convention.
duration = int(options.gps_end_time) - int(options.gps_start_time)
fname = '%s-%s-%d' % (self.tag, options.gps_start_time, duration)
else:
fname = '%s-%d' % (self.tag, self.init_gps_time)
fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5])
self.fdata.dump(fpath, fname, idq_aggregator.floor_div(self.last_save_time[(channel, rate)], self.cadence), key=(channel, rate))
self.to_hdf_file((channel, rate))
self.last_save_time[(channel, rate)] = buftime
else:
if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
......@@ -383,6 +390,21 @@ class MultiChannelHandler(simplehandler.Handler):
latency = numpy.round(int(aggregator.now()) - buftime)
print >>sys.stdout, "buftime = %d, latency at write stage = %d" % (buftime, latency)
def to_hdf_file(self, key, final_write = False):
"""
Dumps triggers saved in memory to disk in hdf5 format.
Uses the T050017 filenaming convention.
NOTE: This method should only be called by an instance that is locked.
"""
self.fdata.dump(self.fpath, self.fname, idq_aggregator.floor_div(self.last_save_time[key], self.cadence), key = key, tmp = True)
# when job completes, move temp file over
if final_write:
final_path = os.path.join(self.fpath, self.fname)+".h5"
tmp_path = final_path+".tmp"
shutil.move(tmp_path, final_path)
def gen_psd_xmldoc(self):
xmldoc = lal.series.make_psd_xmldoc(self.psds)
process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_idq", {})
......@@ -712,9 +734,13 @@ if options.verbose:
mainloop.run()
# save remaining triggers
if not options.save_hdf:
if options.save_hdf:
for key in src.keys():
handler.to_hdf_file(key, final_write = True)
else:
handler.to_trigger_file()
#
# Shut down pipeline
#
......
......@@ -143,19 +143,19 @@ class HDF5FeatureData(FeatureData):
self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys}
self.clear()
def dump(self, path, base, start_time, key = None):
def dump(self, path, base, start_time, key = None, tmp = False):
"""
Saves the current cadence of gps triggers to disk and clear out data
"""
name = "%d_%d" % (start_time, self.cadence)
if key:
group = os.path.join(str(key[0]), str(key[1]).zfill(4))
create_new_dataset(path, base, self.etg_data[key], name=name, group=group)
create_new_dataset(path, base, self.etg_data[key], name=name, group=group, tmp=tmp)
self.clear(key)
else:
for key in self.keys:
group = os.path.join(str(key[0]), str(key[1]).zfill(4))
create_new_dataset(path, base, self.etg_data[key], name=name, group=group)
create_new_dataset(path, base, self.etg_data[key], name=name, group=group, tmp=tmp)
self.clear()
def load(self, path):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment