diff --git a/bin/gstlal_inspiral b/bin/gstlal_inspiral index 0104e92551eb28c53df545064bd83ba69d11f0f6..7f2113693aa3a275fa15ea0b875ec62dc5a3e9e5 100755 --- a/bin/gstlal_inspiral +++ b/bin/gstlal_inspiral @@ -1,7 +1,6 @@ #!/usr/bin/env python # -# Copyright (C) 2010 Kipp Cannon, Chad Hanna -# Copyright (C) 2009 Kipp Cannon, Chad Hanna +# Copyright (C) 2009-2011 Kipp Cannon, Chad Hanna # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the @@ -30,6 +29,7 @@ import os import sys +import threading import warnings @@ -134,6 +134,36 @@ def parse_command_line(): return options, filenames, process_params +# +# ============================================================================= +# +# Misc +# +# ============================================================================= +# + + +# +# write the pipeline to a dot file. +# + + +def write_dump_dot(pipeline, filestem, verbose = False): + """ + This function needs the environment variable GST_DEBUG_DUMP_DOT_DIR + to be set. The filename will be + + os.path.join($GST_DEBUG_DUMP_DOT_DIR, filestem + ".dot") + + If verbose is True, a message will be written to stderr. + """ + if "GST_DEBUG_DUMP_DOT_DIR" not in os.environ: + raise ValueError, "cannot write pipeline, environment variable GST_DEBUG_DUMP_DOT_DIR is not set" + gst.DEBUG_BIN_TO_DOT_FILE(pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filestem) + if verbose: + print >>sys.stderr, "Wrote pipeline to %s" % os.path.join(os.environ["GST_DEBUG_DUMP_DOT_DIR"], "%s.dot" % filestem) + + # # ============================================================================= # @@ -151,6 +181,17 @@ def parse_command_line(): options, filenames, process_params = parse_command_line() +# +# Parse the veto file into segments if provided +# + + +if options.vetoes is not None: + veto_segments = ligolw_segments.segmenttable_get_by_name(utils.load_filename(options.vetoes), options.veto_segments_name).coalesce() +else: + veto_segments = None + + # # Import everything that depends on GStreamer # @@ -251,37 +292,12 @@ if options.online_data: psd = None -# -# How to write the pipeline to a dot file. This option needs the -# environment variable GST_DEBUG_DUMP_DOT_DIR to be set. There are several -# choices for the "details" (second argument). DEBUG_GRAPH_SHOW_ALL is the -# most verbose. -# - - -def maybe_dump_dot(pipeline, filestem, verbose = False): - if "GST_DEBUG_DUMP_DOT_DIR" not in os.environ: - raise ValueError, "Could not write pipeline, please set GST_DEBUG_DUMP_DOT_DIR in your environment" - gst.DEBUG_BIN_TO_DOT_FILE(pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filestem) - if verbose: - print >>sys.stderr, "Wrote pipeline to %s" % os.path.join(os.environ["GST_DEBUG_DUMP_DOT_DIR"], "%s.dot" % filestem) - - pipeline = gst.Pipeline("gstlal_inspiral") mainloop = gobject.MainLoop() handler = lloidparts.LLOIDHandler(mainloop, pipeline) -# -# Parse the veto file into segments if provided -# - -if options.vetoes is not None: - veto_segments = ligolw_segments.segmenttable_get_by_name(utils.load_filename(options.vetoes), options.veto_segments_name).coalesce() -else: - veto_segments = None - -src = lloidparts.mkLLOIDmulti( +triggersrc = lloidparts.mkLLOIDmulti( pipeline, seekevent, detectors = detectors, @@ -296,12 +312,14 @@ src = lloidparts.mkLLOIDmulti( nxydump_segment = options.nxydump_segment ) + # # build output document # + options.out_seg = segments.segment(options.seg[0]+max([b.filter_length for b in banks]), options.seg[1]) #FIXME make better outseg def. -data = ligolw_output.Data( +output = ligolw_output.Data( filename = options.output, process_params = process_params, ifos = set(detectors), @@ -313,17 +331,25 @@ data = ligolw_output.Data( verbose = options.verbose ) -def appsink_new_buffer(elem, data): - for row in sngl_inspirals_from_buffer(elem.get_property("last-buffer")): - if (row.end_time + 1e-9*row.end_time_ns) in data.out_seg: - row.process_id = data.process.process_id - row.event_id = data.sngl_inspiral_table.get_next_id() - data.sngl_inspiral_table.append(row) - if data.connection: - data.connection.commit() -sink = pipeparts.mkappsink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral")) -sink.connect_after("new-buffer", appsink_new_buffer, data) +def appsink_new_buffer(elem, output): + output.lock.acquire() + for row in sngl_inspirals_from_buffer(elem.emit("pull-buffer")): + if (row.end_time + 1e-9*row.end_time_ns) in output.out_seg: + row.process_id = output.process.process_id + row.event_id = output.sngl_inspiral_table.get_next_id() + output.sngl_inspiral_table.append(row) + if output.connection is not None: + output.connection.commit() + output.lock.release() + + +appsinks = set() +for pad in triggersrc.src_pads(): + sink = pipeparts.mkappsink(pipeline, triggersrc, pad = pad, caps = gst.Caps("application/x-lal-snglinspiral")) + sink.connect_after("new-buffer", appsink_new_buffer, output) + appsinks.add(sink) + if options.write_pipeline is not None: # @@ -333,17 +359,31 @@ if options.write_pipeline is not None: # the graph shows the final formats on all links # - class appsink_dump_dot_data(object): - pass - appsink_dump_dot_data = appsink_dump_dot_data() - appsink_dump_dot_data.pipeline = pipeline - appsink_dump_dot_data.filestem = "%s.%s" % (options.write_pipeline, "TRIGGERS") - appsink_dump_dot_data.verbose = options.verbose - appsink_dump_dot_data.object = sink - def appsink_dump_dot(elem, data): - maybe_dump_dot(data.pipeline, data.filestem, verbose = data.verbose) - data.object.disconnect(data.handler_id) - appsink_dump_dot_data.handler_id = sink.connect_after("new-buffer", appsink_dump_dot, appsink_dump_dot_data) + class AppsinkDumpDotData(object): + lock = threading.Lock() + n = 0 + write_after = len(appsinks) + pipeline = pipeline + filestem = "%s.%s" % (options.write_pipeline, "TRIGGERS") + verbose = options.verbose + + def __init__(self): + self.handler_id = None + + def execute(self): + self.lock.acquire() + self.n += 1 + if self.n >= self.write_after: + write_dump_dot(self.pipeline, self.filestem, verbose = self.verbose) + self.lock.release() + + def appsink_dump_dot(elem, appsink_dump_dot_data): + appsink_dump_dot_data.execute() + elem.disconnect(appsink_dump_dot_data.handler_id) + + for sink in appsinks: + appsink_dump_dot_data = AppsinkDumpDotData() + appsink_dump_dot_data.handler_id = sink.connect_after("new-buffer", appsink_dump_dot, appsink_dump_dot_data) # @@ -352,10 +392,10 @@ if options.write_pipeline is not None: if options.write_pipeline is not None: - maybe_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose) + write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose) pipeline.set_state(gst.STATE_PLAYING) if options.write_pipeline is not None: - maybe_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose) + write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose) mainloop.run() @@ -364,7 +404,8 @@ mainloop.run() # -data.write_output_file(verbose = options.verbose) +output.write_output_file(verbose = options.verbose) + # # done diff --git a/python/ligolw_output.py b/python/ligolw_output.py index e620e674c847dfc4f289638540f783d569f53b40..1d2d49e0220512408b2227aa33fd69a641217539 100644 --- a/python/ligolw_output.py +++ b/python/ligolw_output.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2010 +# Copyright (C) 2010-2011 # Kipp Cannon <kipp.cannon@ligo.org> # Chad Hanna <chad.hanna@ligo.org> # @@ -18,6 +18,8 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # + +import threading try: import sqlite3 except ImportError: @@ -120,6 +122,7 @@ def make_process_params(options): class Data(object): def __init__(self, filename, process_params, ifos, seg, out_seg, injection_filename = None, comment = None, tmp_path = None, verbose = False): + self.lock = threading.Lock() self.filename = filename xmldoc = ligolw.Document() xmldoc.appendChild(ligolw.LIGO_LW()) diff --git a/python/pipeparts.py b/python/pipeparts.py index f5a323079390752c8a7abd4e8ca98a4536d4f9b5..08bff560075c3adecd3659a78e6988af94d3e04b 100644 --- a/python/pipeparts.py +++ b/python/pipeparts.py @@ -503,7 +503,7 @@ def mkappsink(pipeline, src, pad = None, **properties): elem.set_property("async", False) elem.set_property("emit-signals", True) elem.set_property("max-buffers", 1) - elem.set_property("drop", True) + elem.set_property("drop", False) for name, value in properties.items(): elem.set_property(name, value) pipeline.add(elem)