Skip to content
Snippets Groups Projects
Commit 12ca1702 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_idq_trigger_gen: removed excessive type casting, added more informative error handling

parent 08dc2e44
No related branches found
No related tags found
No related merge requests found
......@@ -198,7 +198,7 @@ class MultiChannelHandler(simplehandler.Handler):
# create header for trigger file
# header for string output from dataframe
if options.triggers_from_dataframe:
self.header = "# %18s\t%17s\t%14s\t%9s\t%1s\t%1s\t%2s\t%7s\t%s\n" % ("start_time", "stop_time", "trigger_time", "freq", "phase", "sigmasq", "chisq", "snr", "channel")
self.header = "# %18s\t%17s\t%14s\t%9s\t%-2s\t%s\t%2s\t%7s\t%20s\n" % ("start_time", "stop_time", "trigger_time", "freq", "phase", "sigmasq", "chisq", "snr", "channel")
# header for standard output straight to string
else:
#self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%10s\t%10s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "frequency", "Q", "latency", "rate", "channel")
......@@ -264,10 +264,10 @@ class MultiChannelHandler(simplehandler.Handler):
if self.last_save_time is None:
self.last_save_time = buftime
if self.last_hdf_save_time[(channel, int(rate))] is None:
self.last_hdf_save_time[(channel, int(rate))] = buftime
self.last_hdf_save_time[(channel, rate)] = buftime
# check if dataframe needs to be reindexed with new gps times before first buffer
if (buftime - self.truncate_int(self.init_gps_time, aggregator.MIN_TIME_QUANTA)) >= aggregator.MIN_TIME_QUANTA:
self.dataframes[(channel, int(rate))] = self.dataframes[(channel, int(rate))].reindex(self.to_dataframe_index(buftime))
self.dataframes[(channel, rate)] = self.dataframes[(channel, rate)].reindex(self.to_dataframe_index(buftime))
# create path if it doesn't already exist
if not os.path.exists(self.to_agg_path(buftime, channel = channel, rate = rate)):
aggregator.makedir(self.to_agg_path(buftime, channel = channel, rate = rate))
......@@ -289,9 +289,9 @@ class MultiChannelHandler(simplehandler.Handler):
# hdf file saving
if options.triggers_from_dataframe:
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, int(rate))], self.hdf_cadence)) >= self.hdf_cadence:
self.to_hdf5(channel, int(rate), buftime)
self.last_hdf_save_time[(channel, int(rate))] = buftime
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.hdf_cadence)) >= self.hdf_cadence:
self.to_hdf5(channel, rate, buftime)
self.last_hdf_save_time[(channel, rate)] = buftime
# read buffer contents
for i in range(buf.n_memory()):
......@@ -312,9 +312,9 @@ class MultiChannelHandler(simplehandler.Handler):
for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data):
trigger_time = row.end_time + row.end_time_ns * 1e-9
latency = numpy.round(int(aggregator.now()) - buftime)
freq, q, duration = self.basis_params[(channel, int(rate))][row.channel_index]
freq, q, duration = self.basis_params[(channel, rate)][row.channel_index]
start_time = trigger_time - duration
channel_tag = ('%s_%i_%i' %(channel, int(rate)/4, int(rate)/2)).replace(":","_",1)
channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)
# NOTE
# Setting stop time to trigger time for use with half sine gaussians
stop_time = trigger_time
......@@ -326,10 +326,9 @@ class MultiChannelHandler(simplehandler.Handler):
buftime_index = self.truncate_int(buftime, self.hdf_cadence)
if options.triggers_from_dataframe:
try:
self.dataframes[(channel, int(rate))].loc[buftime_index, buftime] = numpy.array([start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr], dtype=float)
self.dataframes[(channel, rate)].loc[buftime_index, buftime] = numpy.array([start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr], dtype=float)
except ValueError:
print >>sys.stderr, "last hdf save time= %d" % self.last_hdf_save_time[(channel, int(rate))]
print >>sys.stderr, "Error saving buffer contents to dataframe at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, int(rate))
print >>sys.stderr, "Error saving buffer contents to dataframe at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
memory.unmap(mapinfo)
......@@ -353,6 +352,7 @@ class MultiChannelHandler(simplehandler.Handler):
try:
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
except KeyError:
print >>sys.stderr, "Error saving dataframe to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
# find gps times of max snr triggers per cadence and save to file
last_reduce_index = self.truncate_int(buftime, self.reduce_cadence) - self.hdf_cadence
......@@ -361,6 +361,7 @@ class MultiChannelHandler(simplehandler.Handler):
try:
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
except KeyError:
print >>sys.stderr, "Error saving dataframe aggregates to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
# reindex to clean out dataframe and save new triggers
self.dataframes[(channel, rate)] = self.dataframes[(channel, rate)].reindex(self.to_dataframe_index(self.truncate_int(self.last_hdf_save_time[(channel, rate)], aggregator.MIN_TIME_QUANTA) + aggregator.MIN_TIME_QUANTA))
......@@ -371,6 +372,7 @@ class MultiChannelHandler(simplehandler.Handler):
try:
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
except KeyError:
print >>sys.stderr, "Error saving dataframe to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
# case 3: save current triggers from current directory and aggregate
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.reduce_cadence)) >= self.reduce_cadence:
......@@ -381,6 +383,7 @@ class MultiChannelHandler(simplehandler.Handler):
try:
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
except KeyError:
print >>sys.stderr, "Error saving dataframe aggregates to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
def to_dataframe_string(self, gps_time):
......@@ -395,9 +398,9 @@ class MultiChannelHandler(simplehandler.Handler):
col2_format = lambda x: '%8.3f' % x
trigger_format = {'start_time': time_format, 'stop_time': time_format, 'trigger_time': time_format, 'frequency': col1_format, 'phase': col2_format, 'sigmasq': col2_format, 'chisq': col2_format, 'snr': col1_format}
for (channel, rate) in self.keys:
channel_tag = ('%s_%i_%i' %(channel, int(rate)/4, int(rate)/2)).replace(":","_",1)
if not self.dataframes[(channel, int(rate))].loc[idx[:, self.last_save_time:gps_time], :].dropna().empty:
self.fdata += self.dataframes[(channel, int(rate))].loc[idx[:, self.last_save_time:gps_time], :].assign(channel_tag = channel_tag).dropna().to_string(col_space = 4, header = False, index = False, formatters = trigger_format) + "\n"
channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)
if not self.dataframes[(channel, rate)].loc[idx[:, self.last_save_time:gps_time], :].dropna().empty:
self.fdata += self.dataframes[(channel, rate)].loc[idx[:, self.last_save_time:gps_time], :].assign(channel_tag = channel_tag).dropna().to_string(header = False, index = False, formatters = trigger_format) + "\n"
def to_dataframe_index(self, gps_time):
"""
......@@ -492,7 +495,7 @@ class LinkedAppSync(pipeparts.AppSync):
assert handler_id > 0
self.appsinks[appsink] = None
_, rate, channel = appsink.name.split("_", 2)
self.sink_dict.setdefault(appsink, (channel, rate))
self.sink_dict.setdefault(appsink, (channel, int(rate)))
return appsink
def pull_buffers(self, elem):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment