Skip to content
Snippets Groups Projects
Commit 5abb743c authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_idq_trigger_gen: added option to read triggers to disk from dataframe...

gstlal_idq_trigger_gen: added option to read triggers to disk from dataframe (new) or from string, fixed bug for missing triggers 10s after aggregator directory change
parent 15392cef
No related branches found
No related tags found
No related merge requests found
......@@ -157,7 +157,7 @@ def parse_command_line():
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--triggers-from-dataframe", action = "store_true", default = False, help = "If set, will output iDQ-compatible triggers to disk straight from dataframe once every cadence")
#
# parse the arguments and sanity check
#
......@@ -193,8 +193,13 @@ class MultiChannelHandler(simplehandler.Handler):
self.last_save_time = None
self.cadence = options.cadence
# create header for trigger file
#self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%10s\t%10s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "frequency", "Q", "latency", "rate", "channel")
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "sigmasq", "chisq", "snr", "channel")
# header for string output from dataframe
if options.triggers_from_dataframe:
self.header = "# %18s\t%17s\t%14s\t%9s\t%1s\t%1s\t%2s\t%7s\t%s\n" % ("start_time", "stop_time", "trigger_time", "freq", "phase", "sigmasq", "chisq", "snr", "channel")
# header for standard output straight to string
else:
#self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%10s\t%10s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "frequency", "Q", "latency", "rate", "channel")
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "sigmasq", "chisq", "snr", "channel")
self.fdata = ""
# dataframe/hdf saving properties
......@@ -254,7 +259,7 @@ class MultiChannelHandler(simplehandler.Handler):
self.last_hdf_save_time[(channel, int(rate))] = buftime
# check if dataframe needs to be reindexed with new gps times before first buffer
if (buftime - self.truncate_int(self.init_gps_time, aggregator.MIN_TIME_QUANTA)) >= aggregator.MIN_TIME_QUANTA:
self.dataframes[(channel, int(rate))].reindex(self.to_dataframe_index(buftime))
self.dataframes[(channel, int(rate))] = self.dataframes[(channel, int(rate))].reindex(self.to_dataframe_index(buftime))
# create path if it doesn't already exist
if not os.path.exists(self.to_agg_path(buftime, channel = channel, rate = rate)):
aggregator.makedir(self.to_agg_path(buftime, channel = channel, rate = rate))
......@@ -262,9 +267,12 @@ class MultiChannelHandler(simplehandler.Handler):
# Save triggers once every cadence
# iDQ file saving
if ((buftime - self.truncate_int(self.last_save_time, self.cadence)) >= self.cadence):
self.to_trigger_file()
if options.triggers_from_dataframe:
self.to_trigger_file(self.to_dataframe_string(buftime))
else:
self.to_trigger_file()
self.fdata = ""
self.last_save_time = buftime
self.fdata = ""
# hdf file saving
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, int(rate))], self.hdf_cadence)) >= self.hdf_cadence:
......@@ -297,8 +305,9 @@ class MultiChannelHandler(simplehandler.Handler):
# Setting stop time to trigger time for use with half sine gaussians
stop_time = trigger_time
# save iDQ compatible data
#self.fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%10.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, phase, snr, chisq, sigmasq, freq, q, latency, int(rate), channel)
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag)
if not options.triggers_from_dataframe:
#self.fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%10.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, phase, snr, chisq, sigmasq, freq, q, latency, int(rate), channel)
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag)
# save dataframe compatible data
buftime_index = self.truncate_int(buftime, self.hdf_cadence)
try:
......@@ -314,6 +323,9 @@ class MultiChannelHandler(simplehandler.Handler):
def to_hdf5(self, channel, rate, buftime):
last_save_index = self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.hdf_cadence) - self.hdf_cadence
# check to make sure saving index only covers current dataframe index
if (last_save_index + self.hdf_cadence) % aggregator.MIN_TIME_QUANTA == 0:
last_save_index = last_save_index + self.hdf_cadence
# case 1: save current triggers from prev leaf directory, aggregate, reindex and clean out dataframe, move to new leaf directory
# FIXME: Doesn't save last 10 second chunk of triggers,
# and also has an indexing error in first 10 sec,
......@@ -322,7 +334,7 @@ class MultiChannelHandler(simplehandler.Handler):
# for now.
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], aggregator.MIN_TIME_QUANTA)) >= aggregator.MIN_TIME_QUANTA:
current_save_index = self.truncate_int(buftime, aggregator.MIN_TIME_QUANTA) - self.hdf_cadence
for save_index in numpy.arange(last_save_index, current_save_index, self.hdf_cadence):
for save_index in range(last_save_index, current_save_index, self.hdf_cadence):
try:
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
except KeyError:
......@@ -340,7 +352,7 @@ class MultiChannelHandler(simplehandler.Handler):
else:
# case 2: save current triggers
current_save_index = self.truncate_int(buftime, self.hdf_cadence) - self.hdf_cadence
for save_index in numpy.arange(last_save_index, current_save_index, self.hdf_cadence):
for save_index in range(last_save_index, current_save_index, self.hdf_cadence):
try:
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
except KeyError:
......@@ -355,7 +367,27 @@ class MultiChannelHandler(simplehandler.Handler):
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
except KeyError:
traceback.print_exc()
def to_dataframe_string(self, gps_time):
"""
Given a gps time, will write contents of
dataframe from last save time to gps time
to a string.
"""
df_data = ""
idx = pandas.IndexSlice
time_format = lambda x: '%20.9f' % x
col1_format = lambda x: '%10.3f' % x
col2_format = lambda x: '%8.3f' % x
trigger_format = {'start_time': time_format, 'stop_time': time_format, 'trigger_time': time_format, 'frequency': col1_format, 'phase': col2_format, 'sigmasq': col2_format, 'chisq': col2_format, 'snr': col1_format}
for (channel, rate) in self.keys:
channel_tag_str = ('%s_%i_%i' %(channel, int(rate)/4, int(rate)/2)).replace(":","_",1)
channel_tag = pandas.Series(numpy.array([channel_tag_str for i in range(self.last_save_time,gps_time + 1)]))
gps_index = self.truncate_int(gps_time, self.hdf_cadence)
if not self.dataframes[(channel, int(rate))].loc[idx[:, self.last_save_time:gps_time], :].dropna().empty:
df_data += self.dataframes[(channel, int(rate))].loc[idx[:, self.last_save_time:gps_time], :].assign(channel_tag = channel_tag.values).dropna().to_string(col_space = 3, header = False, index = False, formatters = trigger_format) + "\n"
return df_data
def to_dataframe_index(self, gps_time):
"""
Returns a two level index based on gps times
......@@ -390,7 +422,7 @@ class MultiChannelHandler(simplehandler.Handler):
assert n > 0
return (x / n) * n
def to_trigger_file(self):
def to_trigger_file(self, framedata = None):
# NOTE
# This method should only be called by an instance that is locked.
# Use T050017 filenaming convention.
......@@ -403,7 +435,10 @@ class MultiChannelHandler(simplehandler.Handler):
os.makedirs(path)
except OSError:
pass
data = self.header + self.fdata
if framedata is not None:
data = self.header + framedata
else:
data = self.header + self.fdata
with open(tmpfile, 'w') as f:
f.write(data)
shutil.move(tmpfile, fpath)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment