Skip to content
Snippets Groups Projects
Commit b98eb70b authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_idq_trigger_gen: performance tweaks for throughput

parent 876fbe67
No related branches found
No related tags found
No related merge requests found
......@@ -208,7 +208,7 @@ class MultiChannelHandler(simplehandler.Handler):
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "sigmasq", "chisq", "snr", "channel", "latency")
else:
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "sigmasq", "chisq", "snr", "channel")
self.fdata = ""
self.fdata = deque(maxlen = 25000)
# dataframe/hdf saving properties
self.tag = '%s-%s' % (self.instrument[:1], self.description)
......@@ -290,14 +290,14 @@ class MultiChannelHandler(simplehandler.Handler):
if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence):
if options.triggers_from_dataframe:
self.to_dataframe_string(buftime)
self.to_trigger_file()
self.fdata = ""
self.to_trigger_file(buftime)
self.fdata.clear()
self.last_save_time = buftime
# save current triggers in dataframe before dataframe is reindexed
if options.triggers_from_dataframe and idq_aggregator.in_new_epoch(buftime, self.last_save_time, aggregator.MIN_TIME_QUANTA):
self.to_dataframe_string(buftime)
self.to_trigger_file()
self.to_trigger_file(buftime)
self.last_save_time = buftime
# hdf file saving
......@@ -339,7 +339,8 @@ class MultiChannelHandler(simplehandler.Handler):
time, will process a row from a gstreamer buffer.
"""
trigger_time = row.end_time + row.end_time_ns * 1e-9
latency = numpy.round(int(aggregator.now()) - buftime)
if options.latency:
latency = numpy.round(int(aggregator.now()) - buftime)
freq, q, duration = self.basis_params[(channel, rate)][row.channel_index]
start_time = trigger_time - duration
channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)
......@@ -349,9 +350,9 @@ class MultiChannelHandler(simplehandler.Handler):
# save iDQ compatible data
if not options.triggers_from_dataframe:
if options.latency:
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag, latency)
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag, latency))
else:
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag)
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag))
# save dataframe compatible data
else:
try:
......@@ -434,7 +435,7 @@ class MultiChannelHandler(simplehandler.Handler):
# fast concatenation to do the above:
self.fdata = self.fdata.join([(idq_aggregator.get_dataframe_subset(self.last_save_time, gps_time, self.dataframes[(channel, rate)]).assign(channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)).dropna().to_string(header = False, index = False, formatters = trigger_format) + "\n") for (channel, rate) in self.keys if not idq_aggregator.get_dataframe_subset(self.last_save_time, gps_time, self.dataframes[(channel, rate)]).dropna().empty])
def to_trigger_file(self):
def to_trigger_file(self, buftime):
# NOTE
# This method should only be called by an instance that is locked.
# Use T050017 filenaming convention.
......@@ -446,11 +447,11 @@ class MultiChannelHandler(simplehandler.Handler):
os.makedirs(path)
except OSError:
pass
#data = self.header + self.fdata
with open(tmpfile, 'w') as f:
#f.write(data)
f.write(self.fdata)
f.write(self.header.join(self.fdata))
shutil.move(tmpfile, fpath)
latency = numpy.round(int(aggregator.now()) - buftime)
print >>sys.stderr, "buftime = %d, latency at write stage = %d" % (buftime, latency)
def gen_psd_xmldoc(self):
xmldoc = lal.series.make_psd_xmldoc(self.psds)
......@@ -503,7 +504,7 @@ class LinkedAppSync(pipeparts.AppSync):
# retrieve the timestamps of all elements that
# aren't at eos and all elements at eos that still
# have buffers in them
timestamps = [(t, e) for e, t in self.appsinks.items() if e not in self.at_eos or t is not None]
timestamps = [(t, e) for e, t in self.appsinks.iteritems() if e not in self.at_eos or t is not None]
# if all elements are at eos and none have buffers,
# then we're at eos
if not timestamps:
......@@ -531,7 +532,7 @@ class LinkedAppSync(pipeparts.AppSync):
# that aren't at eos and all elements at eos that still
# have buffers in them
channel = self.sink_dict[elem][0]
timestamps = [(t, e) for e, t in self.appsinks.items() if self.sink_dict[e][0] == channel and (e not in self.at_eos or t is not None)]
timestamps = [(t, e) for e, t in self.appsinks.iteritems() if self.sink_dict[e][0] == channel and (e not in self.at_eos or t is not None)]
# if all elements are at eos and none have buffers,
# then we're at eos
if not timestamps:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment