Skip to content
Snippets Groups Projects
Commit 6574ecbf authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_idq_trigger_gen: added hdf file saving along with trigger aggregation...

gstlal_idq_trigger_gen: added hdf file saving along with trigger aggregation (max by specified cadence)
parent 7283cf65
No related branches found
No related tags found
No related merge requests found
......@@ -43,6 +43,8 @@ Gst.init(None)
import lal
import numpy
import pandas
from gstlal import pipeio
from gstlal import datasource
from gstlal import idq_multirate_datasource
......@@ -50,6 +52,7 @@ from gstlal import multichannel_datasource
from gstlal import sngltriggertable
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal import aggregator
from gstlal import httpinterface
from gstlal import bottle
from glue.ligolw import utils as ligolw_utils
......@@ -184,6 +187,9 @@ class MultiChannelHandler(simplehandler.Handler):
self.description = kwargs.pop("description")
self.out_path = kwargs.pop("out_path")
self.instrument = kwargs.pop("instrument")
self.keys = kwargs.pop("keys")
# iDQ saving properties
self.last_save_time = None
self.cadence = options.cadence
self.file_flag = False
......@@ -191,6 +197,18 @@ class MultiChannelHandler(simplehandler.Handler):
#self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%10s\t%10s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "frequency", "Q", "latency", "rate", "channel")
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "sigmasq", "chisq", "snr", "channel")
self.fdata = ""
# dataframe/hdf saving properties
self.last_hdf_save_time = dict.fromkeys(self.keys, None)
self.hdf_cadence = 10
self.reduction_cadence = 100
# dataframe properties
gps_now_time_quanta = aggregator.gps_to_minimum_time_quanta(int(aggregator.now()))
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'phase', 'sigmasq', 'chisq', 'snr']
self.dataframes = {}
for (channel, rate) in self.keys:
self.dataframes[(channel, rate)] = pandas.DataFrame(numpy.nan, index = self.to_dataframe_index(gps_now_time_quanta), columns = columns)
# set up bottle routes for spectra by each whitener
self.psds = {}
......@@ -219,7 +237,6 @@ class MultiChannelHandler(simplehandler.Handler):
instrument, info = message.src.get_name().split("_", 1)
channel, _ = info.rsplit("_", 1)
psd = pipeio.parse_spectrum_message(message)
# save psd
self.psds[channel] = psd
return True
......@@ -229,10 +246,16 @@ class MultiChannelHandler(simplehandler.Handler):
with self.lock:
buf = elem.emit("pull-sample").get_buffer()
buftime = int(buf.pts / 1e9)
channel, rate = sink_dict[elem]
if self.last_save_time is None:
self.last_save_time = buftime
if self.last_hdf_save_time[(channel, int(rate))] is None:
self.last_hdf_save_time[(channel, int(rate))] = buftime
if not os.path.exists(self.to_agg_path(buftime, channel = channel, rate = rate)):
aggregator.makedir(self.to_agg_path(buftime, channel = channel, rate = rate))
# Save triggers once every cadence
# iDQ file saving
if ((buftime - self.last_save_time) >= self.cadence) and ( self.file_flag is True or buftime%self.cadence == 0):
# Use file_flag to check if buftime%cadence = 0.
# Flag is set after first time this is true.
......@@ -243,7 +266,11 @@ class MultiChannelHandler(simplehandler.Handler):
self.last_save_time = buftime
self.fdata = ""
channel, rate = sink_dict[elem]
# hdf file saving
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, int(rate))], self.hdf_cadence)) >= self.hdf_cadence:
self.to_hdf5(channel, int(rate), buftime)
self.last_hdf_save_time[(channel, int(rate))] = buftime
for i in range(buf.n_memory()):
memory = buf.peek_memory(i)
result, mapinfo = memory.map(Gst.MapFlags.READ)
......@@ -261,6 +288,7 @@ class MultiChannelHandler(simplehandler.Handler):
if mapinfo.data:
for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data):
trigger_time = row.end_time + row.end_time_ns * 1e-9
# FIXME: use aggregator.now() to grab current time
current_time = gpstime.gps_time_now().gpsSeconds + gpstime.gps_time_now().gpsNanoSeconds * 1e-9
latency = numpy.round(current_time - buftime)
freq, q, duration = self.basis_params[(channel, int(rate))][row.channel_index]
......@@ -269,13 +297,60 @@ class MultiChannelHandler(simplehandler.Handler):
# NOTE
# Setting stop time to trigger time for use with half sine gaussians
stop_time = trigger_time
#self.fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%10.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, row.phase, row.snr, row.chisq, row.sigmasq, freq, q, latency, int(rate), channel)
# save iDQ compatible data
#self.fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%10.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, phase, snr, chisq, sigmasq, freq, q, latency, int(rate), channel)
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag)
# save dataframe compatible data
buftime_index = self.truncate_int(buftime, self.hdf_cadence)
self.dataframes[(channel, int(rate))].loc[buftime_index, buftime] = numpy.array([start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr], dtype=float)
memory.unmap(mapinfo)
del buf
return Gst.FlowReturn.OK
def to_hdf5(self, channel, rate, buftime):
last_save_index = self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.hdf_cadence) - self.hdf_cadence
# case 1: save current triggers from prev leaf directory, aggregate, reindex and clean out dataframe, move to new directory
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], aggregator.MIN_TIME_QUANTA)) >= aggregator.MIN_TIME_QUANTA:
current_save_index = self.truncate_int(buftime, aggregator.MIN_TIME_QUANTA) - self.hdf_cadence
for save_index in numpy.arange(last_save_index, current_save_index, self.hdf_cadence):
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
max_index = self.dataframes[(channel, rate)].loc[last_save_index:current_save_index].groupby(level=0)['snr'].idxmax().values
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
self.dataframes[(channel, rate)].reindex(self.to_dataframe_index(buftime))
else:
# case 2: save current triggers
current_save_index = self.truncate_int(buftime, self.hdf_cadence) - self.hdf_cadence
for save_index in numpy.arange(last_save_index, current_save_index, self.hdf_cadence):
self.dataframes[(channel, rate)].loc[save_index].to_hdf(os.path.join(self.to_agg_path(save_index, channel, rate), '%d.h5' % save_index), 'data', format = 'table', mode = 'w')
# case 3: save current triggers from current directory and aggregate
if (buftime - self.truncate_int(self.last_hdf_save_time[(channel, rate)], self.reduction_cadence)) >= self.reduction_cadence:
max_index = self.dataframes[(channel, rate)].loc[last_save_index:current_save_index].groupby(level=0)['snr'].idxmax().values
self.dataframes[(channel, rate)].loc[max_index].to_hdf(os.path.join(self.to_agg_path(current_save_index, channel, rate), 'aggregates.h5'), 'max', format = 'table', mode = 'a', append = True)
def to_dataframe_index(self, gps_time):
index_t = numpy.arange(gps_time, gps_time + aggregator.MIN_TIME_QUANTA, dtype = numpy.int)
index_cadence = numpy.fromiter(( self.truncate_int(x, self.hdf_cadence) for x in index_t), dtype = numpy.int)
return pandas.MultiIndex.from_tuples(list(zip(index_cadence, index_t)), names = ['gps_time_cadence', 'gps_time'])
def to_agg_path(self, gps_time, channel, rate, level = 0):
path = options.out_path
if channel is not None:
path = os.path.join(path, channel)
path = os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = level))
if rate is not None:
path = os.path.join(path, str(rate).zfill(5))
return path
def truncate_int(self, x, n):
"""
Truncates an integer by positive powers of 10.
e.g. truncate_int(163, 10) = 160
"""
assert n > 0 and n % 10 == 0
return (x / n) * n
def to_trigger_file(self):
# NOTE
# This method should only be called by an instance that is locked.
......@@ -479,7 +554,6 @@ if options.verbose:
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
handler = MultiChannelHandler(mainloop, pipeline, basis_params = basis_params, description = options.description, out_path = options.out_path, instrument = instrument)
# multiple channel src
head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = options.verbose)
......@@ -528,6 +602,8 @@ for channel in channels:
if options.verbose:
print >>sys.stderr, "attaching appsinks to pipeline..."
handler = MultiChannelHandler(mainloop, pipeline, basis_params = basis_params, description = options.description, out_path = options.out_path, instrument = instrument, keys = src.keys())
appsync = LinkedAppSync(appsink_new_buffer = handler.bufhandler)
appsinks = set(appsync.add_sink(pipeline, src[(channel, rate)], name = "sink_%s_%s" % (rate, channel)) for (channel, rate) in src.keys())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment