diff --git a/gstlal-ugly/bin/gstlal_idq_trigger_gen b/gstlal-ugly/bin/gstlal_idq_trigger_gen index ec85de1eaa3e6f707c229eddb9ddd3085856b161..e2ab4470e6a4343a4f49fb359970310f1c5cc3b0 100755 --- a/gstlal-ugly/bin/gstlal_idq_trigger_gen +++ b/gstlal-ugly/bin/gstlal_idq_trigger_gen @@ -31,6 +31,8 @@ import os import sys import resource import StringIO +import threading +import shutil import gi gi.require_version('Gst', '1.0') @@ -140,7 +142,7 @@ def parse_command_line(): multichannel_datasource.append_options(parser) parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .") - parser.add_option("--out-file", metavar = "filename", default = "output_triggers.trg", help = "Set the filename in which to save the output.") + parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.") parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.") parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.") @@ -161,64 +163,79 @@ def parse_command_line(): class MultiChannelHandler(simplehandler.Handler): def __init__(self, *args, **kwargs): + self.lock = threading.Lock() self.durations = kwargs.pop("durations") - self.out_file = kwargs.pop("out_file") + self.description = kwargs.pop("description") self.out_path = kwargs.pop("out_path") + self.instrument = kwargs.pop("instrument") self.last_save_time = None self.cadence = options.cadence - # create header for trigger file - #self.header = "# start_time stop_time time frequency unnormalized_energy normalized_energy chisqdof significance channel\n" - self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "latency", "rate", "channel") + # create header for trigger file + self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "latency", "rate", "channel") + self.fdata = "" super(MultiChannelHandler, self).__init__(*args, **kwargs) def do_on_message(self, bus, message): return False def bufhandler(self, elem, sink_dict): - buf = elem.emit("pull-sample").get_buffer() - buftime = int(buf.pts / 1e9) - if self.last_save_time is None: - self.last_save_time = int(buftime) - channel, rate = sink_dict[elem] - duration = self.durations[(channel, float(rate))] - fdata = "" - for i in range(buf.n_memory()): - memory = buf.peek_memory(i) - result, mapinfo = memory.map(Gst.MapFlags.READ) - assert result - # NOTE NOTE NOTE NOTE - # It is critical that the correct class' - # .from_buffer() method be used here. This - # code is interpreting the buffer's - # contents as an array of C structures and - # building instances of python wrappers of - # those structures but if the python - # wrappers are for the wrong structure - # declaration then terrible terrible things - # will happen - if mapinfo.data: - for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data): - trigger_time = row.end_time + row.end_time_ns * 1e-9 - current_time = gpstime.gps_time_now().gpsSeconds + gpstime.gps_time_now().gpsNanoSeconds * 1e-9 - latency = numpy.round(current_time - buftime) - start_time = trigger_time - duration/2 - stop_time = trigger_time + duration/2 - fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, row.phase, row.snr, row.chisq, row.sigmasq, latency, float(rate), channel) - memory.unmap(mapinfo) - - # Save a "latest" - if (buftime - self.last_save_time) >= self.cadence: - self.last_save_time = int(buftime) - self.to_trigger_file(os.path.join(self.out_path, "%d_%s" % (self.last_save_time, self.out_file)), fdata) - - del buf - return Gst.FlowReturn.OK - - def to_trigger_file(self, path, data): - if not os.path.isfile(path): - data = self.header + data - with open(path, 'a') as f: + with self.lock: + buf = elem.emit("pull-sample").get_buffer() + buftime = int(buf.pts / 1e9) + if self.last_save_time is None: + self.last_save_time = buftime + + # Save triggers once every cadence + if (buftime - self.last_save_time) >= self.cadence: + self.to_trigger_file() + self.last_save_time = buftime + self.fdata = "" + + channel, rate = sink_dict[elem] + duration = self.durations[(channel, float(rate))] + for i in range(buf.n_memory()): + memory = buf.peek_memory(i) + result, mapinfo = memory.map(Gst.MapFlags.READ) + assert result + # NOTE NOTE NOTE NOTE + # It is critical that the correct class' + # .from_buffer() method be used here. This + # code is interpreting the buffer's + # contents as an array of C structures and + # building instances of python wrappers of + # those structures but if the python + # wrappers are for the wrong structure + # declaration then terrible terrible things + # will happen + if mapinfo.data: + for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data): + trigger_time = row.end_time + row.end_time_ns * 1e-9 + current_time = gpstime.gps_time_now().gpsSeconds + gpstime.gps_time_now().gpsNanoSeconds * 1e-9 + latency = numpy.round(current_time - buftime) + start_time = trigger_time - duration/2 + stop_time = trigger_time + duration/2 + self.fdata += "%20.9f\t%20.9f\t%20.9f\t%6.3f\t%8.3f\t%8.3f\t%8.3f\t%9d\t%8.1f\t%s\n" % (start_time, stop_time, trigger_time, row.phase, row.snr, row.chisq, row.sigmasq, latency, float(rate), channel) + memory.unmap(mapinfo) + + del buf + return Gst.FlowReturn.OK + + def to_trigger_file(self): + # NOTE + # This method should only be called by an instance that is locked. + # Use T050017 filenaming convention. + fname = '%s-%s-%d-%d.%s' % (self.instrument, self.description, self.last_save_time, self.cadence, "trg") + path = os.path.join(self.out_path, str(fname.split("-")[2])[:5]) + fpath = os.path.join(path, fname) + tmpfile = fpath+"~" + try: + os.makedirs(path) + except OSError: + pass + data = self.header + self.fdata + with open(tmpfile, 'w') as f: f.write(data) + shutil.move(tmpfile, fpath) class LinkedAppSync(pipeparts.AppSync): @@ -337,7 +354,7 @@ durations = {} # building the event loop and pipeline mainloop = GObject.MainLoop() pipeline = Gst.Pipeline(sys.argv[0]) -handler = MultiChannelHandler(mainloop, pipeline, durations = durations, out_file = options.out_file, out_path = options.out_path) +handler = MultiChannelHandler(mainloop, pipeline, durations = durations, description = options.description, out_path = options.out_path, instrument = instrument) # multiple channel src head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = options.verbose)