Skip to content
Snippets Groups Projects
Commit a5749fd5 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_idq_trigger_gen: fixed issue where last 10s of trigger data before...

gstlal_idq_trigger_gen: fixed issue where last 10s of trigger data before aggregator epoch from dataframes were not being saved, added more error handling
parent 0642ae58
No related branches found
No related tags found
No related merge requests found
......@@ -337,17 +337,13 @@ class MultiChannelHandler(simplehandler.Handler):
return Gst.FlowReturn.OK
def to_hdf5(self, channel, rate, buftime):
last_save_index = self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.hdf_cadence) - self.hdf_cadence
# check to make sure saving index only covers current dataframe index
if (last_save_index + self.hdf_cadence) % aggregator.MIN_TIME_QUANTA == 0:
last_save_index = last_save_index + self.hdf_cadence
# FIXME: doesn't aggregate properly, should be getting
# max of every cadence of triggers, but only grabs
# last trigger per reduce time instead
last_save_index = self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.hdf_cadence)
# case 1: save current triggers from prev leaf directory, aggregate, reindex and clean out dataframe, move to new leaf directory
# FIXME: Doesn't save last hdf save cadence of triggers,
# probably an 'off by one' implementation, will
# need to look into this, but have workaround
# for now.
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], aggregator.MIN_TIME_QUANTA)) >= aggregator.MIN_TIME_QUANTA:
current_save_index = self.truncate_int(buftime, aggregator.MIN_TIME_QUANTA) - self.hdf_cadence
current_save_index = self.truncate_int(buftime, aggregator.MIN_TIME_QUANTA)
for save_index in range(last_save_index, current_save_index, self.hdf_cadence):
try:
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
......@@ -360,14 +356,14 @@ class MultiChannelHandler(simplehandler.Handler):
if max_index.size > 0:
try:
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
except KeyError:
except (KeyError, AttributeError):
print >>sys.stderr, "Error saving dataframe aggregates to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
# reindex to clean out dataframe and save new triggers
self.dataframes[(channel, rate)] = self.dataframes[(channel, rate)].reindex(self.to_dataframe_index(self.truncate_int(self.last_hdf_save_time[(channel, rate)], aggregator.MIN_TIME_QUANTA) + aggregator.MIN_TIME_QUANTA))
else:
# case 2: save current triggers
current_save_index = self.truncate_int(buftime, self.hdf_cadence) - self.hdf_cadence
current_save_index = self.truncate_int(buftime, self.hdf_cadence)
for save_index in range(last_save_index, current_save_index, self.hdf_cadence):
try:
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
......@@ -382,7 +378,7 @@ class MultiChannelHandler(simplehandler.Handler):
if max_index.size > 0:
try:
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
except KeyError:
except (KeyError, AttributeError):
print >>sys.stderr, "Error saving dataframe aggregates to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment