Skip to content
Snippets Groups Projects
Commit 352d76d6 authored by Duncan Meacher's avatar Duncan Meacher
Browse files

gstlal_idq_trigger_gen: Set output file to T050017 formate, added 5 diget sub-directories

parent 72d63fe7
No related branches found
No related tags found
No related merge requests found
...@@ -31,6 +31,8 @@ import os ...@@ -31,6 +31,8 @@ import os
import sys import sys
import resource import resource
import StringIO import StringIO
import threading
import shutil
import gi import gi
gi.require_version('Gst', '1.0') gi.require_version('Gst', '1.0')
...@@ -140,7 +142,7 @@ def parse_command_line(): ...@@ -140,7 +142,7 @@ def parse_command_line():
multichannel_datasource.append_options(parser) multichannel_datasource.append_options(parser)
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .") parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
parser.add_option("--out-file", metavar = "filename", default = "output_triggers.trg", help = "Set the filename in which to save the output.") parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.") parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.") parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
...@@ -161,64 +163,79 @@ def parse_command_line(): ...@@ -161,64 +163,79 @@ def parse_command_line():
class MultiChannelHandler(simplehandler.Handler): class MultiChannelHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.lock = threading.Lock()
self.durations = kwargs.pop("durations") self.durations = kwargs.pop("durations")
self.out_file = kwargs.pop("out_file") self.description = kwargs.pop("description")
self.out_path = kwargs.pop("out_path") self.out_path = kwargs.pop("out_path")
self.instrument = kwargs.pop("instrument")
self.last_save_time = None self.last_save_time = None
self.cadence = options.cadence self.cadence = options.cadence
# create header for trigger file # create header for trigger file
#self.header = "# start_time stop_time time frequency unnormalized_energy normalized_energy chisqdof significance channel\n" self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "latency", "rate", "channel")
self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "latency", "rate", "channel") self.fdata = ""
super(MultiChannelHandler, self).__init__(*args, **kwargs) super(MultiChannelHandler, self).__init__(*args, **kwargs)
def do_on_message(self, bus, message): def do_on_message(self, bus, message):
return False return False
def bufhandler(self, elem, sink_dict): def bufhandler(self, elem, sink_dict):
buf = elem.emit("pull-sample").get_buffer() with self.lock:
buftime = int(buf.pts / 1e9) buf = elem.emit("pull-sample").get_buffer()
if self.last_save_time is None: buftime = int(buf.pts / 1e9)
self.last_save_time = int(buftime) if self.last_save_time is None:
channel, rate = sink_dict[elem] self.last_save_time = buftime
duration = self.durations[(channel, float(rate))]
fdata = "" # Save triggers once every cadence
for i in range(buf.n_memory()): if (buftime - self.last_save_time) >= self.cadence:
memory = buf.peek_memory(i) self.to_trigger_file()
result, mapinfo = memory.map(Gst.MapFlags.READ) self.last_save_time = buftime
assert result self.fdata = ""
# NOTE NOTE NOTE NOTE
# It is critical that the correct class' channel, rate = sink_dict[elem]
# .from_buffer() method be used here. This duration = self.durations[(channel, float(rate))]
# code is interpreting the buffer's for i in range(buf.n_memory()):
# contents as an array of C structures and memory = buf.peek_memory(i)
# building instances of python wrappers of result, mapinfo = memory.map(Gst.MapFlags.READ)
# those structures but if the python assert result
# wrappers are for the wrong structure # NOTE NOTE NOTE NOTE
# declaration then terrible terrible things # It is critical that the correct class'
# will happen # .from_buffer() method be used here. This
if mapinfo.data: # code is interpreting the buffer's
for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data): # contents as an array of C structures and
trigger_time = row.end_time + row.end_time_ns * 1e-9 # building instances of python wrappers of
current_time = gpstime.gps_time_now().gpsSeconds + gpstime.gps_time_now().gpsNanoSeconds * 1e-9 # those structures but if the python
latency = numpy.round(current_time - buftime) # wrappers are for the wrong structure
start_time = trigger_time - duration/2 # declaration then terrible terrible things
stop_time = trigger_time + duration/2 # will happen
fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, row.phase, row.snr, row.chisq, row.sigmasq, latency, float(rate), channel) if mapinfo.data:
memory.unmap(mapinfo) for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data):
trigger_time = row.end_time + row.end_time_ns * 1e-9
# Save a "latest" current_time = gpstime.gps_time_now().gpsSeconds + gpstime.gps_time_now().gpsNanoSeconds * 1e-9
if (buftime - self.last_save_time) >= self.cadence: latency = numpy.round(current_time - buftime)
self.last_save_time = int(buftime) start_time = trigger_time - duration/2
self.to_trigger_file(os.path.join(self.out_path, "%d_%s" % (self.last_save_time, self.out_file)), fdata) stop_time = trigger_time + duration/2
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, row.phase, row.snr, row.chisq, row.sigmasq, latency, float(rate), channel)
del buf memory.unmap(mapinfo)
return Gst.FlowReturn.OK
del buf
def to_trigger_file(self, path, data): return Gst.FlowReturn.OK
if not os.path.isfile(path):
data = self.header + data def to_trigger_file(self):
with open(path, 'a') as f: # NOTE
# This method should only be called by an instance that is locked.
# Use T050017 filenaming convention.
fname = '%s-%s-%d-%d.%s' % (self.instrument, self.description, self.last_save_time, self.cadence, "trg")
path = os.path.join(self.out_path, str(fname.split("-")[2])[:5])
fpath = os.path.join(path, fname)
tmpfile = fpath+"~"
try:
os.makedirs(path)
except OSError:
pass
data = self.header + self.fdata
with open(tmpfile, 'w') as f:
f.write(data) f.write(data)
shutil.move(tmpfile, fpath)
class LinkedAppSync(pipeparts.AppSync): class LinkedAppSync(pipeparts.AppSync):
...@@ -337,7 +354,7 @@ durations = {} ...@@ -337,7 +354,7 @@ durations = {}
# building the event loop and pipeline # building the event loop and pipeline
mainloop = GObject.MainLoop() mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0]) pipeline = Gst.Pipeline(sys.argv[0])
handler = MultiChannelHandler(mainloop, pipeline, durations = durations, out_file = options.out_file, out_path = options.out_path) handler = MultiChannelHandler(mainloop, pipeline, durations = durations, description = options.description, out_path = options.out_path, instrument = instrument)
# multiple channel src # multiple channel src
head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = options.verbose) head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = options.verbose)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment