Skip to content
Snippets Groups Projects
Commit 4d92aa91 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_idq_trigger_gen: added workaround for hdf writing causing program crash...

gstlal_idq_trigger_gen: added workaround for hdf writing causing program crash in rare edge cases, added comments, cosmetic changes
parent 0f76e735
No related branches found
No related tags found
No related merge requests found
......@@ -199,7 +199,7 @@ class MultiChannelHandler(simplehandler.Handler):
# dataframe/hdf saving properties
self.last_hdf_save_time = dict.fromkeys(self.keys, None)
self.hdf_cadence = 10
self.reduction_cadence = 100
self.reduce_cadence = 100
self.init_gps_time = int(aggregator.now())
# dataframe properties
......@@ -300,7 +300,15 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag)
# save dataframe compatible data
buftime_index = self.truncate_int(buftime, self.hdf_cadence)
self.dataframes[(channel, int(rate))].loc[buftime_index, buftime] = numpy.array([start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr], dtype=float)
# FIXME:
# workaround for out of index error caused by
# reindexing being done for the first second after
# a new aggregator path has been made, every
# aggregator cadence
try:
self.dataframes[(channel, int(rate))].loc[buftime_index, buftime] = numpy.array([start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr], dtype=float)
except ValueError:
pass
memory.unmap(mapinfo)
......@@ -309,12 +317,14 @@ class MultiChannelHandler(simplehandler.Handler):
def to_hdf5(self, channel, rate, buftime):
last_save_index = self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.hdf_cadence) - self.hdf_cadence
# case 1: save current triggers from prev leaf directory, aggregate, reindex and clean out dataframe, move to new directory
# case 1: save current triggers from prev leaf directory, aggregate, reindex and clean out dataframe, move to new leaf directory
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], aggregator.MIN_TIME_QUANTA)) >= aggregator.MIN_TIME_QUANTA:
current_save_index = self.truncate_int(buftime, aggregator.MIN_TIME_QUANTA) - self.hdf_cadence
for save_index in numpy.arange(last_save_index, current_save_index, self.hdf_cadence):
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
max_index = self.dataframes[(channel, rate)].loc[last_save_index:current_save_index].groupby(level=0)['snr'].idxmax().dropna().values
# find gps times of max snr triggers per cadence and save to file
last_reduce_index = self.truncate_int(buftime, self.reduce_cadence) - self.hdf_cadence
max_index = self.dataframes[(channel, rate)].loc[last_reduce_index:current_save_index].groupby(level=0)['snr'].idxmax().dropna().values
if max_index.size > 0:
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
self.dataframes[(channel, rate)].reindex(self.to_dataframe_index(buftime))
......@@ -324,18 +334,31 @@ class MultiChannelHandler(simplehandler.Handler):
for save_index in numpy.arange(last_save_index, current_save_index, self.hdf_cadence):
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
# case 3: save current triggers from current directory and aggregate
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.reduction_cadence)) >= self.reduction_cadence:
max_index = self.dataframes[(channel, rate)].loc[last_save_index:current_save_index].groupby(level=0)['snr'].idxmax().dropna().values
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.reduce_cadence)) >= self.reduce_cadence:
# find gps times of max snr triggers per cadence and save to file
last_reduce_index = self.truncate_int(buftime, self.reduce_cadence) - self.hdf_cadence
max_index = self.dataframes[(channel, rate)].loc[last_reduce_index:current_save_index].groupby(level=0)['snr'].idxmax().dropna().values
if max_index.size > 0:
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
def to_dataframe_index(self, gps_time):
"""
Returns a two level index based on gps times
per minimum aggregator quanta.
"""
index_t = numpy.arange(self.truncate_int(gps_time, aggregator.MIN_TIME_QUANTA), self.truncate_int(gps_time, aggregator.MIN_TIME_QUANTA) + aggregator.MIN_TIME_QUANTA, dtype = numpy.int)
index_cadence = numpy.fromiter(( self.truncate_int(x, self.hdf_cadence) for x in index_t), dtype = numpy.int)
return pandas.MultiIndex.from_tuples(list(zip(index_cadence, index_t)), names = ['gps_time_cadence', 'gps_time'])
def to_agg_path(self, gps_time, channel, rate, level = 0):
path = options.out_path
"""
Returns a hierarchical gps time path based on
channel rate and level in hierarchy.
e.g. level 0: out-path/description/channel/1/2/3/4/5/6/rate/
e.g. level 2: out-path/description/channel/1/2/3/4/rate/
"""
tag = '%s-IDQ_TRIGGERS_BY_CHANNEL' % self.instrument[:1]
path = os.path.join(self.out_path, tag)
if channel is not None:
path = os.path.join(path, channel)
path = os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = level))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment