From 1a52d18e0fbf115ea24eea9cea30754bd9945dc1 Mon Sep 17 00:00:00 2001
From: Kipp Cannon <kipp.cannon@ligo.org>
Date: Tue, 5 Apr 2011 14:56:05 -0700
Subject: [PATCH] big clean up

a few bugs are fixed in this:

- a nofakedisconts was missing following the autochisq
- the block size wasn't being propogated into the framesrc
- some commented-out eye candy code wouldn't have worked (it still might not, but it should)
- etc.
---
 bin/gstlal_inspiral  |  79 ++++----
 python/lloidparts.py | 421 ++++++++++++++++++++++---------------------
 python/misc.py       |   1 -
 3 files changed, 249 insertions(+), 252 deletions(-)

diff --git a/bin/gstlal_inspiral b/bin/gstlal_inspiral
index 4b323529c4..5db7bd9efc 100755
--- a/bin/gstlal_inspiral
+++ b/bin/gstlal_inspiral
@@ -28,28 +28,27 @@
 #
 
 
+import os
+import sys
+import warnings
+
+
 import pygtk
 pygtk.require('2.0')
 import pygst
 pygst.require('0.10')
 from gstlal.option import OptionParser
 
-import sys
-import os.path
-import os
-import warnings
 
 from glue import segments
 from glue import segmentsUtils
-import glue.ligolw.utils
-import glue.ligolw.utils.segments as ligolw_segments
 from glue.ligolw import utils
+from glue.ligolw.utils import segments as ligolw_segments
 from pylal.datatypes import LIGOTimeGPS
 from gstlal import ligolw_output
 from pylal.xlal.datatypes.snglinspiraltable import from_buffer as sngl_inspirals_from_buffer
 
 
-
 #
 # =============================================================================
 #
@@ -96,32 +95,30 @@ def parse_command_line():
 		import gstoption
 		parser.add_option_group(gstoption.get_group())
 	except:
-		warnings.warn("Failed to get GStreamer's option group, not adding command line options for it")
+		warnings.warn("failed to get GStreamer's option group, not adding command line options for it")
 
 	options, filenames = parser.parse_args()
 
-	if sum(1 for option in ('frame_cache', 'fake_data', 'online_data') if getattr(options, option) is not None) != 1:
+	if len([option for option in ('frame_cache', 'fake_data', 'online_data') if getattr(options, option) is not None]) != 1:
 		raise ValueError, "must provide exactly one of --frame-cache, --fake-data, --online-data"
 
 	required_options = ["instrument", "output"]
-
 	if options.frame_cache:
 		required_options += ["channel_name", "gps_start_time", "gps_end_time"]
-
 	if options.online_data:
 		required_options += ["reference_psd"]
 
+	missing_options = []
 	if len(options.template_bank) + len(options.svd_bank) == 0:
-		raise SystemExit, "Must provide at least one --template-bank or --svd-bank option."
+		missing_options += ["--template-bank or --svd-bank"]
+	missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
+	if missing_options:
+		raise ValueError, "missing required option(s) %s" % ", ".join(sorted(missing_options))
 
 	# FIXME: should also check for read permissions
-	for bankname in options.template_bank + options.svd_bank:
-		if not os.path.exists(bankname):
-			raise SystemExit, "Template bank file %s does not exist." % bankname
-
-	missing_options = [option for option in required_options if getattr(options, option) is None]
-	if missing_options:
-		raise ValueError, "missing required option(s) %s" % ", ".join("--%s" % option.replace("_", "-") for option in sorted(missing_options))
+	missing_files = [filename for filename in options.template_bank + options.svd_bank if not os.path.exists(filename)]
+	if missing_files:
+		raise ValueError, "files %s do not exist" % ", ".join("'%s'" % filename for filename in sorted(missing_files))
 
 	# do this before converting option types
 	process_params = ligolw_output.make_process_params(options)
@@ -168,15 +165,16 @@ import pygst
 pygst.require('0.10')
 import gst
 
+
 from gstlal import pipeparts
 from gstlal import lloidparts
 from gstlal import templates
-from gstlal.svd_bank import build_bank, read_bank
+from gstlal import svd_bank
 from gstlal import reference_psd
 
 
 #
-# construct pipeline metadata and measure the PSD
+# construct pipeline metadata and measure or retrieve the PSD
 #
 
 
@@ -205,15 +203,11 @@ detectors = {
 }
 
 
-#
-# FIXME
-# right now vetoes are applied after whitening.  If that
-# changes this function will need to know about vetoes too
-#
-
 if options.reference_psd is not None:
 	psd = reference_psd.read_psd(options.reference_psd, verbose = options.verbose)
 else:
+	# FIXME right now vetoes are applied after whitening.  If that
+	# changes this function will need to know about vetoes too
 	psd = measure_psd(
 		options.instrument,
 		seekevent,
@@ -229,24 +223,24 @@ else:
 	if options.write_psd is not None:
 		write_psd(options.write_psd, psd, verbose = options.verbose)
 
+
 #
 # Make template banks
 #
 
-banks = [
-	build_bank(filename, psd, options.flow, options.ortho_gate_fap, options.snr_threshold, options.svd_tolerance, verbose = options.verbose)
-	for filename in options.template_bank
-] + [
-	read_bank(filename)
-	for filename in options.svd_bank
-]
+
+banks = []
+for filename in options.template_bank:
+	banks.append(svd_bank.build_bank(filename, psd, options.flow, options.ortho_gate_fap, options.snr_threshold, options.svd_tolerance, verbose = options.verbose))
+for filename in options.svd_bank:
+	banks.append(svd_bank.read_bank(filename))
+
 for n, bank in enumerate(banks):
 	bank.logname = "bank%d" % n
 
 
-
 #
-# build pipeline
+# Build pipeline
 #
 
 
@@ -258,11 +252,13 @@ if options.online_data:
 
 
 #
-# Write the pipeline to a dot file.
-# This option needs the environment variable GST_DEBUG_DUMP_DOT_DIR
-# to be set. There are several choices for the "details"
-# (second argument). DEBUG_GRAPH_SHOW_ALL is the most verbose.
+# How to write the pipeline to a dot file.  This option needs the
+# environment variable GST_DEBUG_DUMP_DOT_DIR to be set. There are several
+# choices for the "details" (second argument). DEBUG_GRAPH_SHOW_ALL is the
+# most verbose.
 #
+
+
 def maybe_dump_dot(pipeline, filestem, verbose = False):
 	if "GST_DEBUG_DUMP_DOT_DIR" not in os.environ:
 		raise ValueError, "Could not write pipeline, please set GST_DEBUG_DUMP_DOT_DIR in your environment"
@@ -275,6 +271,7 @@ pipeline = gst.Pipeline("gstlal_inspiral")
 mainloop = gobject.MainLoop()
 handler = lloidparts.LLOIDHandler(mainloop, pipeline)
 
+
 #
 # Parse the veto file into segments if provided
 #
@@ -316,7 +313,7 @@ def appsink_new_buffer(elem, data):
 	if data.connection:
 		data.connection.commit()
 
-sink = pipeparts.mkappsink(pipeline, src, caps = gst.Caps("application/x-lal-snglinspiral"))
+sink = pipeparts.mkappsink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral"))
 sink.connect_after("new-buffer", appsink_new_buffer, data)
 
 if options.write_pipeline is not None:
diff --git a/python/lloidparts.py b/python/lloidparts.py
index ba98c22866..052c45ef24 100644
--- a/python/lloidparts.py
+++ b/python/lloidparts.py
@@ -1,5 +1,4 @@
-# Copyright (C) 2010  Kipp Cannon, Chad Hanna
-# Copyright (C) 2009  Kipp Cannon, Chad Hanna
+# Copyright (C) 2009--2011  Kipp Cannon, Chad Hanna
 #
 # This program is free software; you can redistribute it and/or modify it
 # under the terms of the GNU General Public License as published by the
@@ -26,11 +25,10 @@
 
 
 from gstlal import pipeparts
-from gstlal import pipeio
 from gstlal import cbc_template_fir
 import math
 import sys
-import numpy
+
 
 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
 import pygtk
@@ -41,32 +39,6 @@ import pygst
 pygst.require('0.10')
 import gst
 
-def mkelems_fast(bin, *pipedesc):
-	elems = []
-	elem = None
-	for arg in pipedesc:
-		if isinstance(arg, dict):
-			for tup in arg.iteritems():
-				elem.set_property(*tup)
-		else:
-			if elem is not None:
-				if elem.get_parent() is None:
-					bin.add(elem)
-				if len(elems) > 0:
-					elems[-1].link(elem)
-				elems.append(elem)
-			if isinstance(arg, gst.Element):
-				elem = arg
-			else:
-				elem = gst.element_factory_make(arg)
-	if elem is not None:
-		if elem.get_parent() is None:
-			bin.add(elem)
-		if len(elems) > 0:
-			elems[-1].link(elem)
-		elems.append(elem)
-	return elems
-
 
 #
 # =============================================================================
@@ -78,7 +50,7 @@ def mkelems_fast(bin, *pipedesc):
 
 
 class DetectorData(object):
-	# default block_size = 16384 samples/second * 8 bytes/sample * 8
+	# default block_size = 16384 samples/second * 8 bytes/sample * 512
 	# second
 	def __init__(self, frame_cache, channel, block_size = 16384 * 8 * 512):
 		self.frame_cache = frame_cache
@@ -139,8 +111,7 @@ def seek_event_for_gps(gps_start_time, gps_end_time, flags = 0):
 	start_type, start_time = seek_args_for_gps(gps_start_time)
 	stop_type, stop_time   = seek_args_for_gps(gps_end_time)
 
-	return gst.event_new_seek(1., gst.FORMAT_TIME, flags,
-		start_type, start_time, stop_type, stop_time)
+	return gst.event_new_seek(1., gst.FORMAT_TIME, flags, start_type, start_time, stop_type, stop_time)
 
 
 #
@@ -149,61 +120,86 @@ def seek_event_for_gps(gps_start_time, gps_end_time, flags = 0):
 
 
 def mkcontrolsnksrc(pipeline, rate, verbose = False, suffix = None):
-	args = (
-		"lal_adder", {"sync": True},
-		"capsfilter", {"caps": gst.Caps("audio/x-raw-float, rate=%d" % rate)}
-	)
+	snk = gst.element_factory_make("lal_adder")
+	snk.set_property("sync", True)
+	pipeline.add(snk)
+	src = pipeparts.mkcapsfilter(pipeline, snk, "audio/x-raw-float, rate=%d" % rate)
 	if verbose:
-		args += ("progressreport", {"name": "progress_sumsquares%s" % (suffix and "_%s" % suffix or "")})
-	args += ("tee",)
-
-	elems = mkelems_fast(pipeline, *args)
-	return elems[0], elems[-1]
+		src = pipeparts.mkprogressreport(pipeline, src, "progress_sumsquares%s" % (suffix and "_%s" % suffix or ""))
+	src = pipeparts.mktee(pipeline, src)
+	return snk, src
 
 
 #
 # data source
 #
 
+
 def mkLLOIDbasicsrc(pipeline, seekevent, instrument, detector, fake_data = False, online_data = False, injection_filename = None, verbose = False):
 	#
-	# data source and progress report
+	# data source
 	#
 
 	if fake_data:
-		args = ("lal_fakeligosrc", {"instrument": instrument, "channel-name": detector.channel, "blocksize": detector.block_size})
+		assert not online_data
+		src = pipeparts.mkfakeLIGOsrc(pipeline, instrument = instrument, channel_name = detector.channel, blocksize = detector.block_size)
 	elif online_data:
-		args = ("lal_onlinehoftsrc", {"instrument": instrument})
+		assert not fake_data
+		src = pipeparts.mkonlinehoftsrc(pipeline, instrument)
 	else:
-		args = ("lal_framesrc", {"instrument": instrument, "blocksize": detector.block_size, "location": detector.frame_cache, "channel-name": detector.channel})
-	args += ("audioconvert",)
+		src = pipeparts.mkframesrc(pipeline, location = detector.frame_cache, instrument = instrument, channel_name = detector.channel, blocksize = detector.block_size)
+
+	#
+	# seek the data source
+	#
+
+	if src.set_state(gst.STATE_READY) != gst.STATE_CHANGE_SUCCESS:
+		raise RuntimeError, "Element %s did not want to enter ready state" % src.get_name()
+	if not src.send_event(seekevent):
+		raise RuntimeError, "Element %s did not handle seek event" % src.get_name()
+
+	#
+	# convert single precision streams to double precision if needed
+	#
+
+	src = pipeparts.mkaudioconvert(pipeline, src)
+
+	#
+	# progress report
+	#
 
 	if verbose:
-		args += ("progressreport", {"name": "progress_src_%s" % instrument})
+		src = pipeparts.mkprogressreport(pipeline, src, "progress_src_%s" % instrument)
 
-	if injection_filename is not None:
-		args += ("lal_simulation", {"xml-location": injection_filename})
+	#
+	# optional injections
+	#
 
-	elems = mkelems_fast(pipeline, *args)
+	if injection_filename is not None:
+		src = pipeparts.mkinjections(pipeline, src, injection_filename)
 
-	if elems[0].set_state(gst.STATE_READY) != gst.STATE_CHANGE_SUCCESS:
-		raise RuntimeError, "Element %s did not want to enter ready state" % elems[0].get_name()
-	if not elems[0].send_event(seekevent):
-		raise RuntimeError, "Element %s did not handle seek event" % elems[0].get_name()
+	#
+	# done
+	#
 
-	return elems[-1]
+	return src
 
 
-def mkLLOIDsrc(pipeline, src, rates, psd=None, psd_fft_length = 8, veto_segments = None, seekevent = None):
+def mkLLOIDsrc(pipeline, src, rates, psd=None, psd_fft_length = 8, veto_segments = None, seekevent = None, nxydump_segment = None):
 	"""Build pipeline stage to whiten and downsample h(t)."""
 
 	#
-	# down-sample to highest of target sample rates.  note:  there is
-	# no check that this is, infact, *down*-sampling.  if the source
-	# time series has a lower sample rate this will up-sample the data.
-	# up-sampling will probably interact poorly with the whitener as it
-	# will likely add (possibly significant) numerical noise when it
-	# amplifies the non-existant high-frequency components
+	# down-sample to highest of target sample rates.  we include a caps
+	# filter upstream of the resampler to ensure that this is, infact,
+	# *down*-sampling.  if the source time series has a lower sample
+	# rate than the highest target sample rate the resampler will
+	# become an upsampler, and the result will likely interact poorly
+	# with the whitener as it tries to ampify the non-existant
+	# high-frequency components, possibly adding significant numerical
+	# noise to its output.  if you see errors about being unable to
+	# negotiate a format from this stage in the pipeline, it is because
+	# you are asking for output sample rates that are higher than the
+	# sample rate of your data source.
 	#
 	# note that when downsampling it might be possible for the
 	# audioresampler to produce spurious DISCONT flags.  if it receives
@@ -216,26 +212,29 @@ def mkLLOIDsrc(pipeline, src, rates, psd=None, psd_fft_length = 8, veto_segments
 	# might ever do it
 	#
 
-	source_rate = max(rates)
-	elems = mkelems_fast(pipeline,
-		src,
-		"audioresample", {"quality": 9},
-		"capsfilter", {"caps": gst.Caps("audio/x-raw-float, rate=%d" % source_rate)},
-		"lal_whiten", {"fft-length": psd_fft_length, "zero-pad": 0, "average-samples": 64, "median-samples": 7},
-		"lal_nofakedisconts", {"silent": True}
-	)
+	quality = 9
+	head = pipeparts.mkcapsfilter(pipeline, src, "audio/x-raw-float, rate=[%d,MAX]" % max(rates))
+	head = pipeparts.mkresample(pipeline, head, quality = quality)
+	head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw-float, rate=%d" % max(rates))
 
+	#
+	# construct whitener.  this element must be followed by a
+	# nofakedisconts element.
+	#
+
+	head = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = 0, average_samples = 64, median_samples = 7)
 	if psd is None:
 		# use running average PSD
-		elems[-2].set_property("psd-mode", 0)
+		head.set_property("psd-mode", 0)
 	else:
 		# use fixed PSD
-		elems[-2].set_property("psd-mode", 1)
+		head.set_property("psd-mode", 1)
 
 		#
-		# install signal handler to retrieve \Delta f when it is
-		# known, resample the user-supplied PSD, and install it
-		# into the whitener.
+		# install signal handler to retrieve \Delta f and
+		# f_{Nyquist} whenever they are known and/or change,
+		# resample the user-supplied PSD, and install it into the
+		# whitener.
 		#
 
 		def psd_resolution_changed(elem, pspec, psd):
@@ -246,12 +245,14 @@ def mkLLOIDsrc(pipeline, src, rates, psd=None, psd_fft_length = 8, veto_segments
 			psd = cbc_template_fir.interpolate_psd(psd, delta_f)
 			elem.set_property("mean-psd", psd.data[:n])
 
-		elems[-2].connect_after("notify::f-nyquist", psd_resolution_changed, psd)
-		elems[-2].connect_after("notify::delta-f", psd_resolution_changed, psd)
-
-	head = elems[-1]
+		head.connect_after("notify::f-nyquist", psd_resolution_changed, psd)
+		head.connect_after("notify::delta-f", psd_resolution_changed, psd)
+	head = pipeparts.mknofakedisconts(pipeline, head, silent = True)
 
+	#
 	# optionally add vetoes
+	#
+
 	if veto_segments is not None:
 		segsrc = pipeparts.mksegmentsrc(pipeline, veto_segments, invert_output=True)
 		if seekevent is not None:
@@ -262,9 +263,12 @@ def mkLLOIDsrc(pipeline, src, rates, psd=None, psd_fft_length = 8, veto_segments
 		q = pipeparts.mkqueue(pipeline, segsrc, max_size_buffers=0, max_size_bytes=0, max_size_time=(1 * gst.SECOND))
 		head = pipeparts.mkgate(pipeline, head, threshold = 0.1, control = q)
 
-	# put in the final tee
-	elems = mkelems_fast(pipeline, head, "tee")
-	
+	#
+	# tee for highest sample rate stream
+	#
+
+	head = {max(rates): pipeparts.mktee(pipeline, head)}
+
 	#
 	# down-sample whitened time series to remaining target sample rates
 	# while applying an amplitude correction to adjust for low-pass
@@ -290,17 +294,23 @@ def mkLLOIDsrc(pipeline, src, rates, psd=None, psd_fft_length = 8, veto_segments
 	# adjust for the reduction in variance due to the downsampler.
 	#
 
-	quality = 9
-	head = {source_rate: elems[-1]}
-	for rate in sorted(rates, reverse = True)[1:]:	# all but the highest rate
-		head[rate] = mkelems_fast(pipeline,
-			elems[-1],
-			"audioamplify", {"clipping-method": 3, "amplification": 1/math.sqrt(pipeparts.audioresample_variance_gain(quality, source_rate, rate))},
-			"audioresample", {"quality": quality},
-			"capsfilter", {"caps": gst.Caps("audio/x-raw-float, rate=%d" % rate)},
-			"lal_checktimestamps", {"name": "after_downsample_to_%d" % rate},
-			"tee"
-		)[-1]
+	for rate in sorted(set(rates))[:-1]:	# all but the highest rate
+		head[rate] = pipeparts.mktee(
+			pipeline,
+			pipeparts.mkcapsfilter(
+				pipeline,
+				pipeparts.mkresample(
+					pipeline,
+					pipeparts.mkaudioamplify(
+						pipeline,
+						head[max(rates)],
+						1/math.sqrt(pipeparts.audioresample_variance_gain(quality, max(rates), rate))
+					),
+					quality = quality
+				),
+				caps = "audio/x-raw-float, rate=%d" % rate
+			)
+		)
 
 	#
 	# done.  return value is a dictionary of tee elements indexed by
@@ -317,7 +327,7 @@ def mkLLOIDsrc(pipeline, src, rates, psd=None, psd_fft_length = 8, veto_segments
 #
 
 
-def mkLLOIDbranch(pipeline, src, bank, bank_fragment, (control_snk, control_src), gate_attack_length, gate_hold_length):
+def mkLLOIDbranch(pipeline, src, bank, bank_fragment, (control_snk, control_src), gate_attack_length, gate_hold_length, nxydump_segment = None):
 	logname = "%s_%d_%d" % (bank.logname, bank_fragment.start, bank_fragment.end)
 
 	#
@@ -328,13 +338,11 @@ def mkLLOIDbranch(pipeline, src, bank, bank_fragment, (control_snk, control_src)
 	# need to be here, or it might be a symptom of a bug elsewhere.
 	# figure this out.
 
-	src = mkelems_fast(pipeline,
-		src,
-		"lal_firbank", {"block-length-factor": 10, "latency": -int(round(bank_fragment.start * bank_fragment.rate)) - 1, "fir-matrix": bank_fragment.orthogonal_template_bank},
-		"lal_nofakedisconts", {"silent": True},
-		"lal_reblock",
-		"tee"
-	)[-1]
+	src = pipeparts.mkfirbank(pipeline, src, latency = -int(round(bank_fragment.start * bank_fragment.rate)) - 1, fir_matrix = bank_fragment.orthogonal_template_bank)
+	src = pipeparts.mkreblock(pipeline, src, block_duration = 1 * gst.SECOND)
+	src = pipeparts.mktee(pipeline, src)
+	#pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, src), "orthosnr_%s.dump" % logname, segment = nxydump_segment)
+
 	#pipeparts.mkvideosink(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkhistogram(pipeline, src), "video/x-raw-rgb, width=640, height=480, framerate=1/4"))
 	#pipeparts.mkogmvideosink(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkchannelgram(pipeline, pipeparts.mkqueue(pipeline, src), plot_width = .125), "video/x-raw-rgb, width=640, height=480, framerate=64/1"), "orthosnr_channelgram_%s.ogv" % logname, verbose = True)
 
@@ -343,43 +351,34 @@ def mkLLOIDbranch(pipeline, src, bank, bank_fragment, (control_snk, control_src)
 	# aggregator
 	#
 
-	mkelems_fast(pipeline,
-		src,
-		"lal_sumsquares", {"weights": bank_fragment.sum_of_squares_weights},
-		"queue",
-		"audioresample", {"quality": 9},
-		"lal_checktimestamps", {"name": "timestamps_%s_after_sumsquare_resampler" % logname},
-		control_snk
-	)
+	elem = pipeparts.mkresample(pipeline, pipeparts.mkqueue(pipeline, pipeparts.mksumsquares(pipeline, src, weights = bank_fragment.sum_of_squares_weights)), quality = 9)
+	elem = pipeparts.mkchecktimestamps(pipeline, elem, "timestamps_%s_after_sumsquare_resampler" % logname)
+	elem.link(control_snk)
 
 	#
 	# use sum-of-squares aggregate as gate control for orthogonal SNRs
 	#
 
-	elems = mkelems_fast(pipeline,
-		"lal_gate", {"threshold": bank.gate_threshold, "attack-length": gate_attack_length, "hold-length": gate_hold_length},
-		"lal_checktimestamps", {"name": "timestamps_%s_after_gate" % logname},
+	src = pipeparts.mkgate(pipeline, pipeparts.mkqueue(pipeline, src), threshold = bank.gate_threshold, attack_length = gate_attack_length, hold_length = gate_hold_length, control = pipeparts.mkqueue(pipeline, control_src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 1 * gst.SECOND))
+	src = pipeparts.mkchecktimestamps(pipeline, src, "timestamps_%s_after_gate" % logname)
 
-		#
-		# buffer orthogonal SNRs
-		#
-		# FIXME:  teach the collectpads object not to wait for buffers on
-		# pads whose segments have not yet been reached by the input on the
-		# other pads.  then this large queue buffer will not be required
-		# because streaming can begin through the downstream adders without
-		# waiting for input from all upstream elements.
-
-		"queue", {"max-size-buffers": 0, "max-size-bytes": 0, "max-size-time": 2 * gst.SECOND},
+	#
+	# buffer orthogonal SNRs
+	#
+	# FIXME:  teach the collectpads object not to wait for buffers on
+	# pads whose segments have not yet been reached by the input on the
+	# other pads.  then this large queue buffer will not be required
+	# because streaming can begin through the downstream adders without
+	# waiting for input from all upstream elements.
 
-		#
-		# reconstruct physical SNRs
-		#
+	src = pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 2 * gst.SECOND)
 
-		"lal_matrixmixer", {"matrix": bank_fragment.mix_matrix},
-	)
+	#
+	# reconstruct physical SNRs
+	#
 
-	mkelems_fast(pipeline, src, "queue", {"max-size-buffers": 0, "max-size-bytes": 0, "max-size-time": 5 * gst.SECOND})[-1].link_pads("src", elems[0], "sink")
-	mkelems_fast(pipeline, control_src, "queue", {"max-size-buffers": 0, "max-size-bytes": 0, "max-size-time": 1 * gst.SECOND})[-1].link_pads("src", elems[0], "control")
+	src = pipeparts.mkmatrixmixer(pipeline, src, matrix = bank_fragment.mix_matrix)
+	src = pipeparts.mkchecktimestamps(pipeline, src, "timestamps_%s_after_matrixmixer" % logname)
 
 	#
 	# done
@@ -390,14 +389,10 @@ def mkLLOIDbranch(pipeline, src, bank, bank_fragment, (control_snk, control_src)
 	del bank_fragment.sum_of_squares_weights
 	del bank_fragment.mix_matrix
 
-	return elems[-1]
-
-
-def mkLLOIDhoftToSnr(pipeline, hoftdict, instrument, bank, control_snksrc, verbose = False, nxydump_segment = None):
-	"""Build pipeline fragment that converts h(t) to SNR."""
+	return src
 
-	logname = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or ""))
 
+def mkLLOIDhoftToSnr(pipeline, hoftdict, bank, control_snksrc, verbose = False, logname = "", nxydump_segment = None):
 	#
 	# parameters
 	#
@@ -420,59 +415,64 @@ def mkLLOIDhoftToSnr(pipeline, hoftdict, instrument, bank, control_snksrc, verbo
 			# firbank element, and the value here is only
 			# approximate and not tied to the fir bank
 			# parameters so might not work if those change
-			mkelems_fast(pipeline,
-				hoftdict[bank_fragment.rate],
-				"lal_delay", {"delay": int(round((bank.filter_length - bank_fragment.end) * bank_fragment.rate))},
-				"queue", {"max-size-bytes": 0, "max-size-buffers": 0, "max-size-time": 4 * int(math.ceil(bank.filter_length)) * gst.SECOND}
-			)[-1],
+			pipeparts.mkqueue(pipeline, pipeparts.mkdelay(pipeline, hoftdict[bank_fragment.rate], int(round((bank.filter_length - bank_fragment.end) * bank_fragment.rate))), max_size_bytes = 0, max_size_buffers = 0, max_size_time = 4 * int(math.ceil(bank.filter_length)) * gst.SECOND),
 			bank,
 			bank_fragment,
 			control_snksrc,
 			int(math.ceil(-autocorrelation_latency * (float(bank_fragment.rate) / output_rate))),
-			int(math.ceil(-autocorrelation_latency * (float(bank_fragment.rate) / output_rate)))
+			int(math.ceil(-autocorrelation_latency * (float(bank_fragment.rate) / output_rate))),
+			nxydump_segment = nxydump_segment
 		))
 
 	#
-	# sum snrs with common sample rates
+	# sum the snr streams, adding resamplers where needed.  at the
+	# start of this loop, branch_heads is a dictionary mapping sample
+	# rates to sets of matrix mixer elements.  we loop over the
+	# contents of the dictionary from lowest to highest sample rate
 	#
 
-	snr = None
 	next_rate = dict(zip(rates, rates[1:]))
 	for rate, heads in sorted(branch_heads.items()):
 		#
-		# hook matrix mixers to an adder
+		# hook all matrix mixers that share a common sample rate to
+		# an adder.  the adder replaces the set of matrix mixers as
+		# the new "head" associated with the sample rate
 		#
 
-		branchsnr = mkelems_fast(pipeline, "lal_adder", {"sync": True})[-1]
+		branch_heads[rate] = gst.element_factory_make("lal_adder")
+		branch_heads[rate].set_property("sync", True)
+		pipeline.add(branch_heads[rate])
 		for head in heads:
-			pipeparts.mkqueue(pipeline, head, max_size_time = gst.SECOND).link(branchsnr)
-		#logname = "%s_%d_%d" % (bank.logname, bank_fragment.start, bank_fragment.end)
-		#branchsnr = pipeparts.mktee(pipeline, branchsnr)
-		#pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, branchsnr), "snr_%s_%02d.dump" % (logname, bank_fragment.start), segment = nxydump_segment)
+			pipeparts.mkqueue(pipeline, head, max_size_time = gst.SECOND).link(branch_heads[rate])
+
+		#
+		# if the reconstructed snr upto and including this sample
+		# rate is needed for something, like an early warning
+		# detection statistic, or to dump it to a file, it can be
+		# tee'ed off here
+		#
+
+		#branch_heads[rate] = pipeparts.mktee(pipeline, branch_heads[rate])
+		#pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, branch_heads[rate]), "snr_%s_%d.dump" % (logname, rate), segment = nxydump_segment)
 
 		#
 		# if this isn't the highest sample rate, attach a resampler
-		# and add to heads for next highest sample rate.  otherwise
-		# this adder's output is the fully-reconstructed snr stream
+		# to the adder and add the resampler to the set of heads
+		# for next highest sample rate.
 		#
 
 		if rate in next_rate:
-			branchsnr = pipeparts.mkresample(pipeline, branchsnr, quality = 4)
-			#branchsnr = pipeparts.mkchecktimestamps(pipeline, branchsnr, "timestamps_%s_after_snr_resampler" % logname)
-			branch_heads[next_rate[rate]].add(branchsnr)
-		else:
-			assert snr is None	# only one snr element allowed
-			snr = branchsnr
-	assert snr is not None	# did we identify the snr element?
+			branch_heads[next_rate[rate]].add(pipeparts.mkresample(pipeline, branch_heads[rate], quality = 4))
 
 	#
-	# snr
+	# the adder for the highest sample rate provides the final
+	# reconstructed snr
 	#
 
-	return pipeparts.mktogglecomplex(pipeline, snr)
+	return pipeparts.mktogglecomplex(pipeline, branch_heads[max(rates)])
 
 
-def mkLLOIDsnrToTriggers(pipeline, snr, bank, verbose = False, nxydump_segment = None, logname = None):
+def mkLLOIDsnrToTriggers(pipeline, snr, bank, verbose = False, nxydump_segment = None, logname = ""):
 	"""Build pipeline fragment that converts single detector SNR into triggers."""
 	#
 	# parameters
@@ -482,36 +482,25 @@ def mkLLOIDsnrToTriggers(pipeline, snr, bank, verbose = False, nxydump_segment =
 	autocorrelation_length = bank.autocorrelation_bank.shape[1]
 	autocorrelation_latency = -(autocorrelation_length - 1) / 2
 
-	snr = pipeparts.mktee(pipeline, snr)
-
-	#pipeparts.mknxydumpsink(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkqueue(pipeline, snr)), "snr_%s.dump" % logname, segment = nxydump_segment)
-	#pipeparts.mkogmvideosink(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkchannelgram(pipeline, pipeparts.mkqueue(pipeline, snr), plot_width = .125), "video/x-raw-rgb, width=640, height=480, framerate=64/1"), "snr_channelgram_%s.ogv" % logname, audiosrc = pipeparts.mkaudioamplify(pipeline, pipeparts.mkqueue(pipeline, hoftdict[output_rate], max_size_time = 2 * int(math.ceil(bank.filter_length)) * gst.SECOND), 0.125), verbose = True)
-
 	#
 	# \chi^{2}
 	#
 
-	chisq = mkelems_fast(pipeline,
-		snr,
-		"queue",
-		"lal_autochisq", {"autocorrelation-matrix": pipeio.repack_complex_array_to_real(bank.autocorrelation_bank), "latency": autocorrelation_latency, "snr-thresh": bank.snr_threshold}
-	)[-1]
-	#chisq = pipeparts.mktee(pipeline, chisq)
-	#pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, chisq), "chisq_%s.dump" % logname, segment = nxydump_segment)
+	snr = pipeparts.mktee(pipeline, snr)
+	chisq = pipeparts.mkautochisq(pipeline, pipeparts.mkqueue(pipeline, snr), autocorrelation_matrix = bank.autocorrelation_bank, latency = autocorrelation_latency, snr_thresh = bank.snr_threshold)
 	# FIXME:  find a way to use less memory without this hack
 	del bank.autocorrelation_bank
 
+	#chisq = pipeparts.mktee(pipeline, chisq)
+	#pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, chisq), "chisq_%s.dump" % logname, segment = nxydump_segment)
+
 	#
 	# trigger generator and progress report
 	#
 
-	head = mkelems_fast(pipeline,
-		chisq,
-		"lal_triggergen", {"bank-filename": bank.template_bank_filename, "snr-thresh": bank.snr_threshold, "sigmasq": bank.sigmasq},
-	)[-1]
-	mkelems_fast(pipeline, snr, "queue", head)
+	head = pipeparts.mktriggergen(pipeline, pipeparts.mkqueue(pipeline, snr), chisq, template_bank_filename = bank.template_bank_filename, snr_threshold = bank.snr_threshold, sigmasq = bank.sigmasq)
 	if verbose:
-		head = mkelems_fast(pipeline, head, "progressreport", {"name": "progress_xml_%s" % logname})[-1]
+		head = pipeparts.mkprogressreport(pipeline, head, "progress_xml_%s" % logname)
 
 	#
 	# done
@@ -526,59 +515,71 @@ def mkLLOIDsnrToTriggers(pipeline, snr, bank, verbose = False, nxydump_segment =
 
 
 def mkLLOIDmulti(pipeline, seekevent, detectors, banks, psd, psd_fft_length = 8, fake_data = False, online_data = False, injection_filename = None, veto_segments=None, verbose = False, nxydump_segment = None):
-	#
-	# xml stream aggregator
-	#
-
-	# Input selector breaks seeks.  For a single detector and single template bank,
-	# we don't need an input selector.  This is an ugly kludge to make seeks work
-	# in this special (and very high priority) case.
-	needs_input_selector = (len(detectors) * len(banks) > 1)
-	if needs_input_selector:
-		nto1 = mkelems_fast(pipeline, "input-selector", {"select-all": True})[-1]
-
 	#
 	# loop over instruments and template banks
 	#
 
+	rates = set(rate for bank in banks for rate in bank.get_rates())
+	triggersrc = set()
 	for instrument in detectors:
-		rates = set(rate for bank in banks for rate in bank.get_rates())
-		head = mkLLOIDbasicsrc(pipeline, seekevent, instrument, detectors[instrument], fake_data = fake_data, online_data = online_data, injection_filename = injection_filename, verbose = verbose)
+		src = mkLLOIDbasicsrc(pipeline, seekevent, instrument, detectors[instrument], fake_data = fake_data, online_data = online_data, injection_filename = injection_filename, verbose = verbose)
 		# let the frame reader and injection code run in a
 		# different thread than the whitener, etc.,
-		head = pipeparts.mkqueue(pipeline, head)
+		src = pipeparts.mkqueue(pipeline, src)
 		if veto_segments:
-			hoftdict = mkLLOIDsrc(pipeline, head, rates, psd = psd, psd_fft_length = psd_fft_length, seekevent = seekevent, veto_segments = veto_segments[instrument])
+			hoftdict = mkLLOIDsrc(pipeline, src, rates, psd = psd, psd_fft_length = psd_fft_length, seekevent = seekevent, veto_segments = veto_segments[instrument], nxydump_segment = nxydump_segment)
 		else:
-			hoftdict = mkLLOIDsrc(pipeline, head, rates, psd = psd, psd_fft_length = psd_fft_length, veto_segments = None)
+			hoftdict = mkLLOIDsrc(pipeline, src, rates, psd = psd, psd_fft_length = psd_fft_length, veto_segments = None, nxydump_segment = nxydump_segment)
 		for bank in banks:
-			control_snksrc = mkcontrolsnksrc(pipeline, max(bank.get_rates()), verbose = verbose, suffix = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or "")))
-			#pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control_snksrc[1]), "control_%s.dump" % bank.logname, segment = nxydump_segment)
-			snr = mkLLOIDhoftToSnr(
+			suffix = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or ""))
+			control_snksrc = mkcontrolsnksrc(pipeline, max(bank.get_rates()), verbose = verbose, suffix = suffix)
+			#pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control_snksrc[1]), "control_%s.dump" % suffix, segment = nxydump_segment)
+			head = mkLLOIDhoftToSnr(
 				pipeline,
 				hoftdict,
-				instrument,
 				bank,
 				control_snksrc,
 				verbose = verbose,
+				logname = suffix,
 				nxydump_segment = nxydump_segment
 			)
-			head = mkLLOIDsnrToTriggers(
+			head = pipeparts.mkchecktimestamps(pipeline, head, "timestamps_%s_snr" % suffix)
+			#head = pipeparts.mktee(pipeline, head)
+			#pipeparts.mknxydumpsink(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkqueue(pipeline, head)), "snr_%s.dump" % suffix, segment = nxydump_segment)
+			#pipeparts.mkogmvideosink(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkchannelgram(pipeline, pipeparts.mkqueue(pipeline, head), plot_width = .125), "video/x-raw-rgb, width=640, height=480, framerate=64/1"), "snr_channelgram_%s.ogv" % suffix, audiosrc = pipeparts.mkaudioamplify(pipeline, pipeparts.mkqueue(pipeline, hoftdict[max(rates)], max_size_time = 2 * int(math.ceil(bank.filter_length)) * gst.SECOND), 0.125), verbose = True)
+			triggersrc.add(mkLLOIDsnrToTriggers(
 				pipeline,
-				snr,
+				head,
 				bank,
 				verbose = verbose,
 				nxydump_segment = nxydump_segment,
-				logname = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or ""))
-			)
-			if needs_input_selector:
-				mkelems_fast(pipeline, head, "queue", nto1)
+				logname = suffix
+			))
+
+	#
+	# if there is more than one trigger source, use an n-to-1 adapter
+	# to combine into a single stream
+	#
+	# FIXME:  it has been reported that the input selector breaks
+	# seeks.  confirm and fix if needed
+
+	assert len(triggersrc) > 0
+	if len(triggersrc) > 1:
+		# FIXME:  input-selector in 0.10.32 no longer has the
+		# "select-all" feature.  need to get this re-instated
+		assert False	# force crash until input-selector problem is fixed
+		nto1 = gst.element_factory_make("input-selector")
+		nto1.set_property("select-all", True)
+		pipeline.add(nto1)
+		for head in triggersrc:
+			pipeparts.mkqueue(pipeline, head).link(nto1)
+		triggersrc = nto1
+	else:
+		# len(triggersrc) == 1
+		triggersrc, = triggersrc
 
 	#
 	# done
 	#
 
-	if needs_input_selector:
-		return nto1
-	else:
-		return mkelems_fast(pipeline, head, "queue")[-1]
+	return triggersrc
diff --git a/python/misc.py b/python/misc.py
index 848a2b8ebc..50a6654f82 100644
--- a/python/misc.py
+++ b/python/misc.py
@@ -81,4 +81,3 @@ def max_stat_thresh(coeffs, fap, samp_tol=100.0):
 
 def ss_coeffs(S, amp=5.5):
 	return S**2 / (S**2 + len(S) / amp**2 )
-	
-- 
GitLab