From 5c2924473b3dd688f391fa8adb3ac816660eb088 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Wed, 28 Aug 2019 11:15:35 -0700
Subject: [PATCH] move functions from gstlal_inspiral_pipe to inspiral_pipe.py
 for reusability

---
 gstlal-inspiral/bin/gstlal_inspiral_pipe | 1130 +--------------------
 gstlal-inspiral/python/inspiral_pipe.py  | 1145 +++++++++++++++++++++-
 2 files changed, 1165 insertions(+), 1110 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_inspiral_pipe b/gstlal-inspiral/bin/gstlal_inspiral_pipe
index 97798028a5..cbf2b0258e 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_inspiral_pipe
@@ -43,23 +43,17 @@ __author__ = 'Chad Hanna <chad.hanna@ligo.org>, Patrick Godwin <patrick.godwin@l
 ### imports
 
 from collections import OrderedDict
-import functools
-import itertools
-import os
 from optparse import OptionParser
-import stat
+import os
 
 import numpy
 
-import lal.series
 from lal.utils import CacheEntry
 
-from ligo import segments
 from ligo.lw import ligolw
 from ligo.lw import lsctables
 import ligo.lw.utils as ligolw_utils
 
-from gstlal import inspiral
 from gstlal import inspiral_pipe
 from gstlal import dagparts
 from gstlal import datasource
@@ -74,150 +68,6 @@ class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
 lsctables.use_in(LIGOLWContentHandler)
 
 
-#----------------------------------------------------------
-### utility functions
-
-def sim_tag_from_inj_file(injections):
-	if injections is None:
-		return None
-	return injections.replace('.xml', '').replace('.gz', '').replace('-','_')
-
-def get_bank_params(options, verbose = False):
-	bank_cache = {}
-	for bank_cache_str in options.bank_cache:
-		for c in bank_cache_str.split(','):
-			ifo = c.split("=")[0]
-			cache = c.replace(ifo+"=","")
-			bank_cache.setdefault(ifo, []).append(cache)
-
-	max_time = 0
-	template_mchirp_dict = {}
-	for n, cache in enumerate(bank_cache.values()[0]):
-		for ce in map(CacheEntry, open(cache)):
-			for ce in map(CacheEntry, open(ce.path)):
-				xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler)
-				snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc)
-				max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration')))
-				idx = options.overlap[n]/2
-				template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[idx:-idx]), max(snglinspiraltable.getColumnByName('mchirp')[idx:-idx])]
-				xmldoc.unlink()
-
-	return template_mchirp_dict, bank_cache, max_time
-
-def subdir_path(dirlist):
-	output_path = '/'.join(dirlist)
-	try:
-		os.mkdir(output_path)
-	except:
-		pass
-	return output_path
-
-def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
-	"""get a dictionary of all the disjoint 2+ detector combination segments
-	"""
-	segsdict = segments.segmentlistdict()
-	# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
-	start_pad = 512 + max_template_length
-	# Chosen so that the overlap is only a ~5% hit in run time for long segments...
-	segment_length = int(5 * start_pad)
-	for n in range(min_instruments, 1 + len(analyzable_instruments_set)):
-		for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n):
-			segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
-			segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
-			segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad)
-			segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad)
-			if not segsdict[frozenset(ifo_combos)]:
-				del segsdict[frozenset(ifo_combos)]
-	return segsdict
-
-def create_svd_bank_strings(svd_nodes, instruments = None):
-	# FIXME assume that the number of svd nodes is the same per ifo, a good assumption though
-	outstrings = []
-	for i in range(len(svd_nodes.values()[0])):
-		svd_bank_string = ""
-		for ifo in svd_nodes:
-			if instruments is not None and ifo not in instruments:
-				continue
-			try:
-				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"])
-			except AttributeError:
-				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i])
-		svd_bank_string = svd_bank_string.strip(",")
-		outstrings.append(svd_bank_string)
-	return outstrings
-
-def svd_bank_cache_maker(svd_bank_strings, injection = False):
-	if injection:
-		dir_name = "gstlal_inspiral_inj"
-	else:
-		dir_name = "gstlal_inspiral"
-	svd_cache_entries = []
-	parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
-	for svd_bank_parsed_dict in parsed_svd_bank_strings:
-		for filename in svd_bank_parsed_dict.itervalues():
-			svd_cache_entries.append(CacheEntry.from_T050017(filename))
-
-	return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries]
-
-def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
-	# first get the previous output in a usable form
-	lloid_output = {}
-	for inj in options.injections + [None]:
-		lloid_output[sim_tag_from_inj_file(inj)] = {}
-	lloid_diststats = {}
-	if options.dist_stats_cache:
-		for ce in map(CacheEntry, open(options.dist_stats_cache)):
-			lloid_diststats[ce.description.split("_")[0]] = [ce.path]
-	for ifos in segsdict:
-		for seg in segsdict[ifos]:
-			# iterate over the mass space chunks for each segment
-			for node in inspiral_nodes[(ifos, None)][seg]:
-				if node is None:
-					break
-				len_out_files = len(node.output_files["output-cache"])
-				for f in node.output_files["output-cache"]:
-					# Store the output files and the node for use as a parent dependency
-					lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node]))
-				for f in node.output_files["ranking-stat-output-cache"]:
-					lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f)
-			for inj in options.injections:
-				for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
-					if injnode is None:
-						continue
-					for f in injnode.output_files["output-cache"]:
-						# Store the output files and the node and injnode for use as a parent dependencies
-						bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0]
-						try:
-							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode]))
-						except KeyError:
-							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode]))
-
-	return lloid_output, lloid_diststats
-
-def set_up_scripts(options):
-	# Make an xml integrity checker
-	if options.gzip_test:
-		with open("gzip_test.sh", "w") as f:
-			f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
-		os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
-
-	# A pre script to backup data before feeding to lossy programs
-	# (e.g. clustering routines)
-	with open("store_raw.sh", "w") as f:
-		f.write("""#!/bin/bash
-		for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
-	os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
-
-def load_reference_psd(options):
-	ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
-
-	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
-	with open('reference_psd.cache', "w") as output_cache_file:
-		output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd)))
-
-	return ref_psd
-
-
 #----------------------------------------------------------
 ### command line options
 
@@ -376,38 +226,6 @@ def parse_command_line():
 #----------------------------------------------------------
 ### DAG utilities
 
-def get_threshold_values(bgbin_indices, svd_bank_strings, options):
-	"""Calculate the appropriate ht-gate-threshold values according to the scale given
-	"""
-	if options.ht_gate_threshold_linear is not None:
-		# A scale is given
-		mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
-		# use max mchirp in a given svd bank to decide gate threshold
-		bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
-		gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)
-		return [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
-	elif options.ht_gate_threshold is not None:
-		return [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given
-	else:
-		return None
-
-def inputs_to_db(jobs, inputs, job_type = 'toSqlite'):
-	dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
-	db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
-	return os.path.join(subdir_path([jobs[job_type].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
-
-def cache_to_db(cache, jobs):
-	hi_index = cache[-1].description.split('_')[0]
-	db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path))
-	db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
-	return db
-
-def get_rank_file(instruments, boundary_seg, n, basename, job=None):
-	if job:
-		return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path)
-	else:
-		return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.cache')
-
 def set_up_jobs(options):
 	jobs = {}
 
@@ -519,910 +337,6 @@ def set_up_jobs(options):
 
 	return jobs
 
-#----------------------------------------------------------
-### DAG layers
-
-def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options):
-	psd_nodes = {}
-	for ifos in segsdict:
-		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
-		for seg in segsdict[ifos]:
-			psd_path = subdir_path([jobs['refPSD'].output_path, str(int(seg[0]))[:5]])
-			psd_nodes[(ifos, seg)] = dagparts.DAGNode(
-				jobs['refPSD'],
-				dag,
-				parent_nodes = parent_nodes,
-				opts = {
-					"gps-start-time":int(seg[0]),
-					"gps-end-time":int(seg[1]),
-					"data-source":"frames",
-					"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
-					"psd-fft-length":options.psd_fft_length,
-					"frame-segments-name": options.frame_segments_name
-				},
-				input_files = {
-					"frame-cache":options.frame_cache,
-					"frame-segments-file":options.frame_segments_file
-				},
-				output_files = {
-					"write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = psd_path)
-				},
-			)
-
-	# Make the reference PSD cache
-	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
-	with open('reference_psd.cache', "w") as output_cache_file:
-		for node in psd_nodes.values():
-			output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"])))
-
-	return psd_nodes
-
-def median_psd_layer(dag, jobs, parent_nodes, *args):
-	gpsmod5 = str(int(boundary_seg[0]))[:5]
-	median_psd_path = subdir_path([jobs['medianPSD'].output_path, gpsmod5])
-
-	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
-	median_psd_nodes = []
-	for chunk, nodes in enumerate(dagparts.groups(parent_nodes.values(), 50)):
-		median_psd_node = \
-			dagparts.DAGNode(jobs['medianPSD'], dag,
-				parent_nodes = parent_nodes.values(),
-				input_files = {"": [node.output_files["write-psd"] for node in nodes]},
-				output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD_CHUNK_%04d" % chunk, boundary_seg, '.xml.gz', path = median_psd_path)}
-			)
-		median_psd_nodes.append(median_psd_node)
-
-	median_psd_node = \
-		dagparts.DAGNode(jobs['medianPSD'], dag,
-			parent_nodes = median_psd_nodes,
-			input_files = {"": [node.output_files["output-name"] for node in median_psd_nodes]},
-			output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([jobs['medianPSD'].output_path, gpsmod5]))}
-		)
-	return median_psd_node
-
-def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_mchirp_dict):
-	svd_nodes = {}
-	new_template_mchirp_dict = {}
-	svd_dtdphi_map = {}
-	for ifo, list_of_svd_caches in bank_cache.items():
-		bin_offset = 0
-		for j, svd_caches in enumerate(list_of_svd_caches):
-			svd_caches = map(CacheEntry, open(svd_caches))
-			for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches):
-				# First sort out the clipleft, clipright options
-				clipleft = []
-				clipright = []
-				ids = []
-				mchirp_interval = (float("inf"), 0)
-				individual_svd_cache = map(CacheEntry, open(individual_svd_cache))
-				for n, f in enumerate(ce.path for ce in individual_svd_cache):
-					# handle template bank clipping
-					clipleft.append(options.overlap[j] / 2)
-					clipright.append(options.overlap[j] / 2)
-					ids.append("%d_%d" % (i+bin_offset, n))
-					if f in template_mchirp_dict:
-						mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))
-					svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j]
-
-				svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path)
-				if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
-					new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval
-
-				svdnode = dagparts.DAGNode(
-					jobs['svd'],
-					dag,
-					parent_nodes = parent_nodes,
-					opts = {
-						"svd-tolerance":options.tolerance,
-						"flow":options.flow[j],
-						"sample-rate":options.sample_rate,
-						"clipleft":clipleft,
-						"clipright":clipright,
-						"samples-min":options.samples_min[j],
-						"samples-max-256":options.samples_max_256,
-						"samples-max-64":options.samples_max_64,
-						"samples-max":options.samples_max,
-						"autocorrelation-length":options.autocorrelation_length,
-						"bank-id":ids,
-						"identity-transform":options.identity_transform,
-						"ortho-gate-fap":0.5
-					},
-					input_files = {"reference-psd":psd},
-					input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]},
-					input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"),
-					output_files = {"write-svd":svd_bank_name},
-				)
-
-				# impose a priority to help with depth first submission
-				svdnode.set_priority(99)
-				svd_nodes.setdefault(ifo, []).append(svdnode)
-			bin_offset += i+1
-
-	# Plot template/svd bank jobs
-	primary_ifo = bank_cache.keys()[0]
-	dagparts.DAGNode(
-		jobs['plotBanks'],
-		dag,
-		parent_nodes = sum(svd_nodes.values(),[]),
-		opts = {"plot-template-bank":"", "output-dir": output_dir},
-		input_files = {"template-bank-file":options.template_bank},
-	)
-
-	return svd_nodes, new_template_mchirp_dict, svd_dtdphi_map
-
-def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):
-	inspiral_nodes = {}
-	for ifos in segsdict:
-		# FIXME: handles more than 3 ifos with same cpu/memory requests
-		inspiral_name = 'gstlalInspiral%dIFO' % min(len(ifos), 3)
-		inspiral_inj_name = 'gstlalInspiralInj%dIFO' % min(len(ifos), 3)
-
-		# setup dictionaries to hold the inspiral nodes
-		inspiral_nodes[(ifos, None)] = {}
-		ignore = {}
-		injection_files = []
-		for injections in options.injections:
-			min_chirp_mass, max_chirp_mass, injections = injections.split(':')
-			injection_files.append(injections)
-			min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass)
-			inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
-			ignore[injections] = []
-			for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)):
-				if max_chirp_mass <= bounds[0]:
-					ignore[injections].append(int(bgbin_index))
-					# NOTE putting a break here assumes that the min chirp mass
-					# in a subbank increases with bin number, i.e. XXXX+1 has a
-					# greater minimum chirpmass than XXXX, for all XXXX. Note
-					# that the reverse is not true, bin XXXX+1 may have a lower
-					# max chirpmass than bin XXXX.
-				elif min_chirp_mass > bounds[1]:
-					ignore[injections].append(int(bgbin_index))
-
-		# FIXME choose better splitting?
-		numchunks = 50
-
-		# only use a channel dict with the relevant channels
-		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
-
-		# get the svd bank strings
-		svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys())
-
-		# get a mapping between chunk counter and bgbin for setting priorities
-		bgbin_chunk_map = {}
-
-		for seg in segsdict[ifos]:
-			if injection_files:
-				output_seg_inj_path = subdir_path([jobs[inspiral_inj_name].output_path, str(int(seg[0]))[:5]])
-
-			if jobs[inspiral_name] is None:
-				# injection-only run
-				inspiral_nodes[(ifos, None)].setdefault(seg, [None])
-
-			else:
-				output_seg_path = subdir_path([jobs[inspiral_name].output_path, str(int(seg[0]))[:5]])
-				for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)):
-					bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
-					# setup output names
-					output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)]
-					output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
-					dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
-
-					for bgbin in bgbin_indices:
-						bgbin_chunk_map.setdefault(bgbin, chunk_counter)
-
-					# Calculate the appropriate ht-gate-threshold values according to the scale given
-					threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options)
-
-					# non injection node
-					noninjnode = dagparts.DAGNode(jobs[inspiral_name], dag,
-						parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
-						opts = {
-							"psd-fft-length":options.psd_fft_length,
-							"ht-gate-threshold":threshold_values,
-							"frame-segments-name":options.frame_segments_name,
-							"gps-start-time":int(seg[0]),
-							"gps-end-time":int(seg[1]),
-							"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
-							"tmp-space":dagparts.condor_scratch_space(),
-							"track-psd":"",
-							"control-peak-time":options.control_peak_time,
-							"coincidence-threshold":options.coincidence_threshold,
-							"singles-threshold":options.singles_threshold,
-							"fir-stride":options.fir_stride,
-							"data-source":"frames",
-							"local-frame-caching":"",
-							"min-instruments":options.min_instruments,
-							"reference-likelihood-file":options.reference_likelihood_file
-						},
-						input_files = {
-							"time-slide-file":options.time_slide_file,
-							"frame-cache":options.frame_cache,
-							"frame-segments-file":options.frame_segments_file,
-							"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
-							"blind-injections":options.blind_injections,
-							"veto-segments-file":options.vetoes,
-						},
-						input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)},
-						output_cache_files = {
-							"output-cache":output_names,
-							"ranking-stat-output-cache":dist_stat_names
-						}
-					)
-
-					# Set a post script to check for file integrity
-					if options.gzip_test:
-						noninjnode.set_post_script("gzip_test.sh")
-						noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names))
-
-					# impose a priority to help with depth first submission
-					noninjnode.set_priority(chunk_counter+15)
-
-					inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)
-
-			# process injections
-			for injections in injection_files:
-				# setup output names
-				sim_name = sim_tag_from_inj_file(injections)
-
-				bgbin_svd_bank_strings = [bgbin_svdbank for i, bgbin_svdbank in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]]
-
-				for chunk_counter, bgbin_list in enumerate(dagparts.groups(bgbin_svd_bank_strings, numchunks)):
-					bgbin_indices, svd_bank_strings = zip(*bgbin_list)
-					output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices]
-					output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (idx, sim_name), seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
-					svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))]
-					try:
-						reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"]
-						parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices]
-					except AttributeError: ### injection-only run
-						reference_psd = psd_nodes[(ifos, seg)]
-						parents = []
-
-					svd_files = [CacheEntry.from_T050017(filename) for filename in svd_names]
-					input_cache_name = dagparts.group_T050017_filename_from_T050017_files(svd_files, '.cache').replace('SVD', 'SVD_%s' % sim_name)
-
-					# Calculate the appropriate ht-gate-threshold values according to the scale given
-					threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options)
-
-					# setup injection node
-					# FIXME: handles more than 3 ifos with same cpu/memory requests
-					injnode = dagparts.DAGNode(jobs[inspiral_inj_name], dag,
-						parent_nodes = parents,
-						opts = {
-							"psd-fft-length":options.psd_fft_length,
-							"ht-gate-threshold":threshold_values,
-							"frame-segments-name":options.frame_segments_name,
-							"gps-start-time":int(seg[0]),
-							"gps-end-time":int(seg[1]),
-							"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
-							"tmp-space":dagparts.condor_scratch_space(),
-							"track-psd":"",
-							"control-peak-time":options.control_peak_time,
-							"coincidence-threshold":options.coincidence_threshold,
-							"singles-threshold":options.singles_threshold,
-							"fir-stride":options.fir_stride,
-							"data-source":"frames",
-							"local-frame-caching":"",
-							"min-instruments":options.min_instruments,
-							"reference-likelihood-file":options.reference_likelihood_file
-						},
-						input_files = {
-							"time-slide-file":options.inj_time_slide_file,
-							"frame-cache":options.frame_cache,
-							"frame-segments-file":options.frame_segments_file,
-							"reference-psd":reference_psd,
-							"veto-segments-file":options.vetoes,
-							"injections": injections
-						},
-						input_cache_files = {"svd-bank-cache":svd_names},
-						input_cache_file_name = input_cache_name,
-						output_cache_files = {"output-cache":output_names}
-					)
-					# Set a post script to check for file integrity
-					if options.gzip_test:
-						injnode.set_post_script("gzip_test.sh")
-						injnode.add_post_script_arg(" ".join(output_names))
-
-					# impose a priority to help with depth first submission
-					if bgbin_chunk_map:
-						injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1)
-					else:
-						injnode.set_priority(chunk_counter+1)
-
-					inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)
-
-	# Replace mchirplo:mchirphi:inj.xml with inj.xml
-	options.injections = [inj.split(':')[-1] for inj in options.injections]
-
-	# NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
-	lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)
-
-	return inspiral_nodes, lloid_output, lloid_diststats
-
-def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs):
-	ligolw_add_nodes = []
-	for inj in options.injections:
-		inj_snr_nodes = []
-
-		inj_splitter_node = dagparts.DAGNode(jobs['injSplitter'], dag, parent_nodes=[],
-			opts = {
-				"output-path":jobs['injSplitter'].output_path,
-				"usertag": sim_tag_from_inj_file(inj.split(":")[-1]),
-				"nsplit": num_split_inj_snr_jobs
-			},
-			input_files = {"": inj.split(":")[-1]}
-		)
-		inj_splitter_node.set_priority(98)
-
-		# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
-		injection_files = ["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)]
-		for injection_file in injection_files:
-			injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node],
-				# FIXME somehow choose the actual flow based on mass?
-				# max(flow) is chosen for performance not
-				# correctness hopefully though it will be good
-				# enough
-				opts = {"flow":max(options.flow),"fmax":options.fmax},
-				input_files = {
-					"injection-file": injection_file,
-					"reference-psd-cache": "reference_psd.cache"
-				}
-			)
-			injSNRnode.set_priority(98)
-			inj_snr_nodes.append(injSNRnode)
-
-		addnode = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes=inj_snr_nodes,
-			input_files = {"": ' '.join(injection_files)},
-			output_files = {"output": inj.split(":")[-1]}
-		)
-
-		ligolw_add_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [addnode],
-			opts = {"sql-file":options.injection_proc_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-			input_files = {"":addnode.output_files["output"]}
-			)
-		)
-	return ligolw_add_nodes
-
-def summary_plot_layer(dag, jobs, parent_nodes, options, injdbs, noninjdb):
-	plotnodes = []
-
-	### common plot options
-	common_plot_opts = {
-		"segments-name": options.frame_segments_name,
-		"tmp-space": dagparts.condor_scratch_space(),
-		"output-dir": output_dir,
-		"likelihood-file":"post_marginalized_likelihood.xml.gz",
-		"shrink-data-segments": 32.0,
-		"extend-veto-segments": 8.,
-	}
-	sensitivity_opts = {
-		"output-dir":output_dir,
-		"tmp-space":dagparts.condor_scratch_space(),
-		"veto-segments-name":"vetoes",
-		"bin-by-source-type":"",
-		"dist-bins":200,
-		"data-segments-name":"datasegments"
-	}
-
-	### plot summary
-	opts = {"user-tag": "ALL_LLOID_COMBINED", "remove-precession": ""}
-	opts.update(common_plot_opts)
-	plotnodes.append(dagparts.DAGNode(jobs['plotSummary'], dag, parent_nodes=[farnode],
-		opts = opts,
-		input_files = {"": [noninjdb] + injdbs}
-	))
-
-	### isolated precession plot summary
-	opts = {"user-tag": "PRECESSION_LLOID_COMBINED", "isolate-precession": "", "plot-group": 1}
-	opts.update(common_plot_opts)
-	plotnodes.append(dagparts.DAGNode(jobs['plotSummaryIsolatePrecession'], dag, parent_nodes=[farnode],
-		opts = opts,
-		input_files = {"":[noninjdb] + injdbs}
-	))
-
-	for injdb in injdbs:
-		### individual injection plot summary
-		opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1], "remove-precession": "", "plot-group": 1}
-		opts.update(common_plot_opts)
-		plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummary'], dag, parent_nodes=[farnode],
-			opts = opts,
-			input_files = {"":[noninjdb] + [injdb]}
-		))
-
-		### isolated precession injection plot summary
-		opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "isolate-precession": "", "plot-group": 1}
-		opts.update(common_plot_opts)
-		plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummaryIsolatePrecession'], dag, parent_nodes=[farnode],
-			opts = opts,
-			input_files = {"":[noninjdb] + [injdb]}
-		))
-
-	### sensitivity plots
-	opts = {"user-tag": "ALL_LLOID_COMBINED"}
-	opts.update(sensitivity_opts)
-	plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode],
-		opts = opts,
-		input_files = {"zero-lag-database": noninjdb, "": injdbs}
-	))
-
-	for injdb in injdbs:
-		opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]}
-		opts.update(sensitivity_opts)
-		plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode],
-			opts = opts,
-			input_files = {"zero-lag-database": noninjdb, "": injdb}
-		))
-
-	### background plots
-	plotnodes.append(dagparts.DAGNode(jobs['plotBackground'], dag, parent_nodes = [farnode],
-		opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir},
-		input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb}
-	))
-
-	return plotnodes
-
-def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete):
-	"""clean intermediate merger products
-	"""
-	for db in dbs_to_delete:
-		dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
-			input_files = {"": db}
-		)
-
-	for margfile in margfiles_to_delete:
-		dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
-			input_files = {"": margfile}
-		)
-	return None
-
-def inj_psd_layer(segsdict, options):
-	psd_nodes = {}
-	psd_cache_files = {}
-	for ce in map(CacheEntry, open(options.psd_cache)):
-		psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path))
-	for ifos in segsdict:
-		reference_psd_files = sorted(psd_cache_files[ifos], key = lambda (s, p): s)
-		ref_psd_file_num = 0
-		for seg in segsdict[ifos]:
-			while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]):
-				ref_psd_file_num += 1
-			psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1]
-	ref_psd_parent_nodes = []
-	return psd_nodes, ref_psd_parent_nodes
-
-def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd):
-	"""mass model node
-	"""
-	if options.mass_model_file is None:
-		# choose, arbitrarily, the lowest instrument in alphabetical order
-		model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = jobs['model'].output_path)
-		model_node = dagparts.DAGNode(jobs['model'], dag,
-			input_files = {"template-bank": options.template_bank, "reference-psd": psd},
-			opts = {"model":options.mass_model},
-			output_files = {"output": model_file_name},
-			parent_nodes = parent_nodes
-		)
-		return [model_node], model_file_name
-	else:
-		return [], options.mass_model_file
-
-def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None):
-	"""merge and cluster from sqlite database
-	"""
-	if input_files:
-		input_ = {"": input_files}
-	else:
-		input_ = {}
-
-	# Merge database into chunks
-	sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
-		opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-		input_files = input_,
-		input_cache_files = {"input-cache": db_cache},
-		output_files = {"database":db},
-		input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
-	)
-
-	# cluster database
-	return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-		opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()},
-		input_files = {"": db}
-	)
-
-def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map):
-	instruments = "".join(sorted(instrument_set))
-	margnodes = {}
-
-	# NOTE! we rely on there being identical templates in each instrument,
-	# so we just take one of the values of the svd_nodes which are a dictionary
-	# FIXME, the svd nodes list has to be the same as the sorted keys of
-	# lloid_output.  svd nodes should be made into a dictionary much
-	# earlier in the code to prevent a mishap
-	one_ifo_svd_nodes = dict(("%04d" % n, node) for n, node in enumerate( svd_nodes.values()[0]))
-
-	# Here n counts the bins
-	# FIXME - this is broken for injection dags right now because of marg nodes
-	# first non-injections, which will get skipped if this is an injections-only run
-	for bin_key in sorted(lloid_output[None].keys()):
-		outputs = lloid_output[None][bin_key]
-		diststats = lloid_diststats[bin_key]
-		inputs = [o[0] for o in outputs]
-		parents = dagparts.flatten([o[1] for o in outputs])
-		rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key)
-
-		# FIXME we keep this here in case we someday want to have a
-		# mass bin dependent prior, but it really doesn't matter for
-		# the time being.
-		priornode = dagparts.DAGNode(jobs['createPriorDistStats'], dag,
-			parent_nodes = [one_ifo_svd_nodes[bin_key]] + model_node or [],
-			opts = {
-				"instrument": instrument_set,
-				"background-prior": 1,
-				"min-instruments": options.min_instruments,
-				"coincidence-threshold":options.coincidence_threshold,
-				"df": "bandwidth"
-			},
-			input_files = {
-				"svd-file": one_ifo_svd_nodes[bin_key].output_files["write-svd"],
-				"mass-model-file": model_file,
-				"dtdphi-file": svd_dtdphi_map[bin_key],
-				"psd-xml": ref_psd
-			},
-			output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])}
-		)
-		# Create a file that has the priors *and* all of the diststats
-		# for a given bin marginalized over time. This is all that will
-		# be needed to compute the likelihood
-		diststats_per_bin_node = dagparts.DAGNode(jobs['marginalize'], dag,
-			parent_nodes = [priornode] + parents,
-			opts = {"marginalize": "ranking-stat"},
-			input_cache_files = {"likelihood-cache": diststats + [priornode.output_files["write-likelihood"]]},
-			output_files = {"output": rankfile('MARG_DIST_STATS', job=jobs['marginalize'])},
-			input_cache_file_name = rankfile('MARG_DIST_STATS')
-		)
-
-		margnodes[bin_key] = diststats_per_bin_node
-
-	return margnodes
-
-def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set):
-	rankpdf_nodes = []
-	rankpdf_zerolag_nodes = []
-	instruments = "".join(sorted(instrument_set))
-
-	# Here n counts the bins
-	for bin_key in sorted(marg_nodes.keys()):
-		rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key)
-
-		calcranknode = dagparts.DAGNode(jobs['calcRankPDFs'], dag,
-			parent_nodes = [marg_nodes[bin_key]],
-			opts = {"ranking-stat-samples":options.ranking_stat_samples},
-			input_files = {"": marg_nodes[bin_key].output_files["output"]},
-			output_files = {"output": rankfile('CALC_RANK_PDFS', job=jobs['calcRankPDFs'])},
-		)
-
-		calcrankzerolagnode = dagparts.DAGNode(jobs['calcRankPDFsWithZerolag'], dag,
-			parent_nodes = [marg_nodes[bin_key]],
-			opts = {"add-zerolag-to-background": "", "ranking-stat-samples": options.ranking_stat_samples},
-			input_files = {"": marg_nodes[bin_key].output_files["output"]},
-			output_files = {"output": rankfile('CALC_RANK_PDFS_WZL', job=jobs['calcRankPDFsWithZerolag'])},
-		)
-
-		rankpdf_nodes.append(calcranknode)
-		rankpdf_zerolag_nodes.append(calcrankzerolagnode)
-
-	return rankpdf_nodes, rankpdf_zerolag_nodes
-
-def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set):
-	likelihood_nodes = {}
-	instruments = "".join(sorted(instrument_set))
-
-	# non-injection jobs
-	for bin_key in sorted(lloid_output[None].keys()):
-		outputs = lloid_output[None][bin_key]
-		diststats = lloid_diststats[bin_key]
-		inputs = [o[0] for o in outputs]
-
-		# (input files for next job, dist stat files, parents for next job)
-		likelihood_nodes[None, bin_key] = (inputs, marg_nodes[bin_key].output_files["output"], [marg_nodes[bin_key]])
-
-	# injection jobs
-	for inj in options.injections:
-		lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)]
-		for bin_key in sorted(lloid_nodes.keys()):
-			outputs = lloid_nodes[bin_key]
-			diststats = lloid_diststats[bin_key]
-			if outputs is not None:
-				inputs = [o[0] for o in outputs]
-				parents = dagparts.flatten([o[1] for o in outputs])
-
-				parents.append(marg_nodes[bin_key])
-				likelihood_url = marg_nodes[bin_key].output_files["output"]
-				likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents)
-
-	return likelihood_nodes
-
-def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments):
-	num_chunks = 100
-	innodes = {}
-
-	# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
-	for (sim_tag, bin_key), (inputs, likelihood_url, parents) in sorted(likelihood_nodes.items()):
-		db = inputs_to_db(jobs, inputs, job_type = 'toSqliteNoCache')
-		xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz")
-		snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file
-		cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file
-
-		# cluster sub banks
-		cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents,
-			opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-			input_files = {"":inputs}
-			)
-
-		# merge sub banks
-		merge_node = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes = [cluster_node],
-			input_files = {"":inputs},
-			output_files = {"output":xml}
-			)
-
-		# cluster and simplify sub banks
-		cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [merge_node],
-			opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-			input_files = {"":xml}
-			)
-
-		# assign likelihoods
-		likelihood_node = dagparts.DAGNode(jobs['calcLikelihood'], dag,
-			parent_nodes = [cluster_node],
-			opts = {"tmp-space":dagparts.condor_scratch_space()},
-			input_files = {"likelihood-url":likelihood_url, "": xml}
-			)
-
-		sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [likelihood_node],
-			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-			input_files = {"":xml},
-			output_files = {"database":db},
-		)
-		sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-			opts = {"sql-file": cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-			input_files = {"":db}
-		)
-
-		innodes.setdefault(sim_tag_from_inj_file(sim_tag) if sim_tag else None, []).append(sqlitenode)
-
-	# make sure outnodes has a None key, even if its value is an empty list
-	# FIXME injection dag is broken
-	innodes.setdefault(None, [])
-
-	if options.vetoes is None:
-		vetoes = []
-	else:
-		vetoes = [options.vetoes]
-
-	chunk_nodes = []
-	dbs_to_delete = []
-	# Process the chirp mass bins in chunks to paralellize the merging process
-	for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
-		try:
-			dbs = [node.input_files[""] for node in nodes]
-			parents = nodes
-
-		except AttributeError:
-			# analysis started at merger step but seeded by lloid files which
-			# have already been merged into one file per background
-			# bin, thus the analysis will begin at this point
-			dbs = nodes
-			parents = []
-
-		dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
-		noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
-
-		# Merge and cluster the final non injection database
-		noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file)
-		chunk_nodes.append(noninjsqlitenode)
-		dbs_to_delete.append(noninjdb)
-
-	# Merge the final non injection database
-	outnodes = []
-	injdbs = []
-	if options.non_injection_db: #### injection-only run
-		noninjdb = options.non_injection_db
-	else:
-		final_nodes = []
-		for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
-			noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
-
-			# cluster the final non injection database
-			noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file)
-			final_nodes.append(noninjsqlitenode)
-
-		input_files = (vetoes + [options.frame_segments_file])
-		input_cache_files = [node.input_files[""] for node in final_nodes]
-		noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
-		noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
-
-		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode],
-			input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
-		)
-
-		outnodes.append(cpnode)
-
-	if options.injections:
-		iterable_injections = options.injections
-	else:
-		iterable_injections = options.injections_for_merger
-
-	for injections in iterable_injections:
-		# extract only the nodes that were used for injections
-		chunk_nodes = []
-
-		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
-			try:
-				dbs = [injnode.input_files[""] for injnode in injnodes]
-				parents = injnodes
-			except AttributeError:
-				dbs = injnodes
-				parents = []
-
-			# Setup the final output names, etc.
-			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
-			injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
-
-			# merge and cluster
-			clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file)
-			chunk_nodes.append(clusternode)
-			dbs_to_delete.append(injdb)
-
-
-		final_nodes = []
-		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
-			# Setup the final output names, etc.
-			injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
-
-			# merge and cluster
-			clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.cluster_sql_file)
-			final_nodes.append(clusternode)
-
-		# Setup the final output names, etc.
-		injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
-		injdbs.append(injdb)
-		injxml = injdb.replace('.sqlite','.xml.gz')
-
-		xml_input = injxml
-
-		# merge and cluster
-		parent_nodes = final_nodes + ligolw_add_nodes
-		input_files = (vetoes + [options.frame_segments_file, injections])
-		input_cache_files = [node.input_files[""] for node in final_nodes]
-		clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
-
-		clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode],
-			opts = {"tmp-space":dagparts.condor_scratch_space()},
-			output_files = {"extract":injxml},
-			input_files = {"database":injdb}
-		)
-
-		inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode],
-			opts = {"time-window":0.9},
-			input_files = {"":injxml}
-		)
-
-		sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode],
-			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-			output_files = {"database":injdb},
-			input_files = {"":xml_input}
-		)
-
-		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode],
-			input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
-		)
-
-		outnodes.append(cpnode)
-
-	return injdbs, noninjdb, outnodes, dbs_to_delete
-
-def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
-	ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes]
-	margjobs = [jobs['marginalize'], jobs['marginalizeWithZerolag']]
-	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
-	filesuffixs = ['', '_with_zerolag']
-
-	margnum = 16
-	all_margcache = []
-	all_margnodes = []
-	final_margnodes = []
-	for nodes, job, margfile, filesuffix in zip(ranknodes, margjobs, margfiles, filesuffixs):
-		try:
-			margin = [node.output_files["output"] for node in nodes]
-			parents = nodes
-		except AttributeError: ### analysis started at merger step
-			margin = nodes
-			parents = []
-
-		margnodes = []
-		margcache = []
-
-		# split up the marginalization into groups of 10
-		# FIXME: is it actually groups of 10 or groups of 16?
-		for margchunk in dagparts.groups(margin, margnum):
-			if nodes:
-				marg_ce = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margchunk]
-				margcache.append(dagparts.group_T050017_filename_from_T050017_files(marg_ce, '.xml.gz', path = job.output_path))
-				margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = parents,
-					opts = {"marginalize": "ranking-stat-pdf"},
-					output_files = {"output": margcache[-1]},
-					input_cache_files = {"likelihood-cache": margchunk},
-					input_cache_file_name = os.path.basename(margcache[-1]).replace('.xml.gz','.cache')
-				))
-
-		all_margcache.append(margcache)
-		all_margnodes.append(margnodes)
-
-	if not options.marginalized_likelihood_file: ### not an injection-only run
-		for nodes, job, margnodes, margcache, margfile, filesuffix in zip(ranknodes, margjobs, all_margnodes, all_margcache, margfiles, filesuffixs):
-			final_margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = margnodes,
-				opts = {"marginalize": "ranking-stat-pdf"},
-				output_files = {"output": "marginalized_likelihood%s.xml.gz"%filesuffix},
-				input_cache_files = {"likelihood-cache": margcache},
-				input_cache_file_name = "marginalized_likelihood%s.cache"%filesuffix
-			))
-
-	return final_margnodes, dagparts.flatten(all_margcache)
-
-def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes):
-	"""compute FAPs and FARs
-	"""
-	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
-	filesuffixs = ['', '_with_zerolag']
-
-	for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs):
-		if options.marginalized_likelihood_file: ### injection-only run
-			parents = final_sqlite_nodes
-			marginalized_likelihood_file = margfile
-
-		else:
-			parents = [margnode] + final_sqlite_nodes
-			marginalized_likelihood_file = margnode.output_files["output"]
-
-		farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents,
-			opts = {"tmp-space":dagparts.condor_scratch_space()},
-			input_files = {
-				"background-bins-file": marginalized_likelihood_file,
-				"injection-db": [injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs] if 'zerolag' in filesuffix else injdbs,
-				"non-injection-db": noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') if 'zerolag' in filesuffix else noninjdb
-			}
-		)
-
-		if 'zerolag' not in filesuffix:
-			outnode = farnode
-
-	return outnode
-
-def horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir):
-	"""calculate horizon distance
-	"""
-	dagparts.DAGNode(jobs['horizon'], dag,
-		parent_nodes = psd_nodes.values(),
-		input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]},
-		output_files = {"":dagparts.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)}
-	)
-
-def summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs):
-	"""create a summary page
-	"""
-	output_user_tags = ["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"]
-	output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs])
-	output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs])
-
-	dagparts.DAGNode(jobs['summaryPage'], dag, parent_nodes = plotnodes,
-		opts = {
-			"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])),
-			"webserver-dir":options.web_dir,
-			"glob-path":output_dir,
-			"output-user-tag":output_user_tags
-		}
-	)
-
 
 #----------------------------------------------------------
 ### main
@@ -1458,24 +372,24 @@ if __name__ == '__main__':
 		dag.add_maxjobs_category("INSPIRAL", options.max_inspiral_jobs)
 
 	# generate xml integrity checker (if requested) and pre-script to back up data
-	set_up_scripts(options)
+	inspiral_pipe.set_up_scripts(options)
 
 	# Get mchirp boundaries of banks, maximum duration of templates, and analysis segments
 	if options.bank_cache or options.psd_cache:
 		if options.bank_cache:
-			template_mchirp_dict, bank_cache, max_time = get_bank_params(options)
+			template_mchirp_dict, bank_cache, max_time = inspiral_pipe.get_bank_params(options)
 			instrument_set = bank_cache.keys()
 
 		elif options.psd_cache:
 			template_mchirp_dict, svd_nodes, max_time = inspiral_pipe.get_svd_bank_params(options.svd_bank_cache)
 			instrument_set = svd_nodes.keys()
 
-		segsdict = analysis_segments(set(instrument_set), detectors.frame_segments, boundary_seg, max_time, options.min_instruments)
+		segsdict = inspiral_pipe.analysis_segments(set(instrument_set), detectors.frame_segments, boundary_seg, max_time, options.min_instruments)
 		instruments = "".join(sorted(instrument_set))
 
 	if options.psd_cache:
 		### reference psd jobs
-		psd_nodes, ref_psd_parent_nodes = inj_psd_layer(segsdict, options)
+		psd_nodes, ref_psd_parent_nodes = inspiral_pipe.inj_psd_layer(segsdict, options)
 
 	elif options.lloid_cache:
 		# starting analysis at merger step, nothing to do here
@@ -1483,68 +397,68 @@ if __name__ == '__main__':
 
 	elif options.reference_psd is None:
 		# Compute the PSDs for each segment
-		psd_nodes = ref_psd_layer(dag, jobs, [], segsdict, channel_dict, options)
+		psd_nodes = inspiral_pipe.ref_psd_layer(dag, jobs, [], segsdict, channel_dict, options)
 
 		# plot the horizon distance
-		horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir)
+		inspiral_pipe.horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir, instruments)
 
 		# compute the median PSD
-		median_psd_node = median_psd_layer(dag, jobs, psd_nodes, options)
+		median_psd_node = inspiral_pipe.median_psd_layer(dag, jobs, psd_nodes, options, boundary_seg, instruments)
 		ref_psd = median_psd_node.output_files["output-name"]
 		ref_psd_parent_nodes = [median_psd_node]
 
 	else:
 		# load reference PSD
-		ref_psd = load_reference_psd(options)
+		ref_psd = inspiral_pipe.load_reference_psd(options)
 		ref_psd_parent_nodes = []
 
 	# Calculate Expected SNR jobs
 	if not options.lloid_cache and not options.disable_calc_inj_snr:
-		ligolw_add_nodes = expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100)
+		ligolw_add_nodes = inspiral_pipe.expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100)
 	else:
 		ligolw_add_nodes = []
 
 	if options.bank_cache:
 		# Compute SVD banks
-		svd_nodes, template_mchirp_dict, svd_dtdphi_map = svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, template_mchirp_dict)
+		svd_nodes, template_mchirp_dict, svd_dtdphi_map = inspiral_pipe.svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, output_dir, template_mchirp_dict)
 
 		# mass model
-		model_node, model_file = mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd)
+		model_node, model_file = inspiral_pipe.mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd)
 	else:
 		model_node = None
 		model_file = options.mass_model_file
 
 	if not options.lloid_cache:
 		# Inspiral jobs by segment
-		inspiral_nodes, lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
+		inspiral_nodes, lloid_output, lloid_diststats = inspiral_pipe.inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
 
 		# marginalize jobs
-		marg_nodes = marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map)
+		marg_nodes = inspiral_pipe.marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map)
 
 		# calc rank PDF jobs
-		rankpdf_nodes, rankpdf_zerolag_nodes = calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
+		rankpdf_nodes, rankpdf_zerolag_nodes = inspiral_pipe.calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
 
 	# final marginalization step
-	final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes)
+	final_marg_nodes, margfiles_to_delete = inspiral_pipe.final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, options)
 
 	# likelihood jobs
-	likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set)
+	likelihood_nodes = inspiral_pipe.likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set)
 
 	# Setup clustering and/or merging
 	# after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof)
-	injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments)
+	injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = inspiral_pipe.sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, boundary_seg, instruments)
 
 	# Compute FAR
-	farnode = compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes)
+	farnode = inspiral_pipe.compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes, options)
 
 	# make summary plots
-	plotnodes = summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb)
+	plotnodes = inspiral_pipe.summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb, output_dir)
 
 	# make a web page
-	summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs)
+	inspiral_pipe.summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs, output_dir)
 
 	# rm intermediate merger products
-	clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete)
+	inspiral_pipe.clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete)
 
 	#
 	# generate DAG files
diff --git a/gstlal-inspiral/python/inspiral_pipe.py b/gstlal-inspiral/python/inspiral_pipe.py
index 4e982b8c0c..bbeafafaf0 100644
--- a/gstlal-inspiral/python/inspiral_pipe.py
+++ b/gstlal-inspiral/python/inspiral_pipe.py
@@ -1,4 +1,5 @@
 # Copyright (C) 2013--2014  Kipp Cannon, Chad Hanna
+# Copyright (C) 2019        Patrick Godwin
 #
 # This program is free software; you can redistribute it and/or modify it
 # under the terms of the GNU General Public License as published by the
@@ -36,12 +37,961 @@
 # - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments
 # - On line 201, fix the comment or explain what the comment is meant to be
 
-import socket, copy, doctest
+#
+# imports
+#
+
+import copy
+import doctest
+import functools
+import itertools
+import os
+import socket
+import stat
+
+import lal.series
+from lal.utils import CacheEntry
+
 from ligo import segments
 from ligo.lw import lsctables, ligolw
 from ligo.lw import utils as ligolw_utils
+
+from gstlal import dagparts
+from gstlal import datasource
+from gstlal import inspiral
 from gstlal import svd_bank
-from lal.utils import CacheEntry
+
+
+#
+# LIGOLW initialization
+#
+
+
+class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
+	pass
+lsctables.use_in(LIGOLWContentHandler)
+
+
+#
+# DAG layers
+#
+
+
+def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options):
+	psd_nodes = {}
+	for ifos in segsdict:
+		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
+		for seg in segsdict[ifos]:
+			psd_path = subdir_path([jobs['refPSD'].output_path, str(int(seg[0]))[:5]])
+			psd_nodes[(ifos, seg)] = dagparts.DAGNode(
+				jobs['refPSD'],
+				dag,
+				parent_nodes = parent_nodes,
+				opts = {
+					"gps-start-time":int(seg[0]),
+					"gps-end-time":int(seg[1]),
+					"data-source":"frames",
+					"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
+					"psd-fft-length":options.psd_fft_length,
+					"frame-segments-name": options.frame_segments_name
+				},
+				input_files = {
+					"frame-cache":options.frame_cache,
+					"frame-segments-file":options.frame_segments_file
+				},
+				output_files = {
+					"write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = psd_path)
+				},
+			)
+
+	# Make the reference PSD cache
+	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
+	with open('reference_psd.cache', "w") as output_cache_file:
+		for node in psd_nodes.values():
+			output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"])))
+
+	return psd_nodes
+
+def median_psd_layer(dag, jobs, parent_nodes, options, boundary_seg, instruments):
+	gpsmod5 = str(int(boundary_seg[0]))[:5]
+	median_psd_path = subdir_path([jobs['medianPSD'].output_path, gpsmod5])
+
+	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
+	median_psd_nodes = []
+	for chunk, nodes in enumerate(dagparts.groups(parent_nodes.values(), 50)):
+		median_psd_node = \
+			dagparts.DAGNode(jobs['medianPSD'], dag,
+				parent_nodes = parent_nodes.values(),
+				input_files = {"": [node.output_files["write-psd"] for node in nodes]},
+				output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD_CHUNK_%04d" % chunk, boundary_seg, '.xml.gz', path = median_psd_path)}
+			)
+		median_psd_nodes.append(median_psd_node)
+
+	median_psd_node = \
+		dagparts.DAGNode(jobs['medianPSD'], dag,
+			parent_nodes = median_psd_nodes,
+			input_files = {"": [node.output_files["output-name"] for node in median_psd_nodes]},
+			output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([jobs['medianPSD'].output_path, gpsmod5]))}
+		)
+	return median_psd_node
+
+def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, output_dir, template_mchirp_dict):
+	svd_nodes = {}
+	new_template_mchirp_dict = {}
+	svd_dtdphi_map = {}
+	for ifo, list_of_svd_caches in bank_cache.items():
+		bin_offset = 0
+		for j, svd_caches in enumerate(list_of_svd_caches):
+			svd_caches = map(CacheEntry, open(svd_caches))
+			for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches):
+				# First sort out the clipleft, clipright options
+				clipleft = []
+				clipright = []
+				ids = []
+				mchirp_interval = (float("inf"), 0)
+				individual_svd_cache = map(CacheEntry, open(individual_svd_cache))
+				for n, f in enumerate(ce.path for ce in individual_svd_cache):
+					# handle template bank clipping
+					clipleft.append(options.overlap[j] / 2)
+					clipright.append(options.overlap[j] / 2)
+					ids.append("%d_%d" % (i+bin_offset, n))
+					if f in template_mchirp_dict:
+						mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))
+					svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j]
+
+				svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path)
+				if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
+					new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval
+
+				svdnode = dagparts.DAGNode(
+					jobs['svd'],
+					dag,
+					parent_nodes = parent_nodes,
+					opts = {
+						"svd-tolerance":options.tolerance,
+						"flow":options.flow[j],
+						"sample-rate":options.sample_rate,
+						"clipleft":clipleft,
+						"clipright":clipright,
+						"samples-min":options.samples_min[j],
+						"samples-max-256":options.samples_max_256,
+						"samples-max-64":options.samples_max_64,
+						"samples-max":options.samples_max,
+						"autocorrelation-length":options.autocorrelation_length,
+						"bank-id":ids,
+						"identity-transform":options.identity_transform,
+						"ortho-gate-fap":0.5
+					},
+					input_files = {"reference-psd":psd},
+					input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]},
+					input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"),
+					output_files = {"write-svd":svd_bank_name},
+				)
+
+				# impose a priority to help with depth first submission
+				svdnode.set_priority(99)
+				svd_nodes.setdefault(ifo, []).append(svdnode)
+			bin_offset += i+1
+
+	# Plot template/svd bank jobs
+	primary_ifo = bank_cache.keys()[0]
+	dagparts.DAGNode(
+		jobs['plotBanks'],
+		dag,
+		parent_nodes = sum(svd_nodes.values(),[]),
+		opts = {"plot-template-bank":"", "output-dir": output_dir},
+		input_files = {"template-bank-file":options.template_bank},
+	)
+
+	return svd_nodes, new_template_mchirp_dict, svd_dtdphi_map
+
+
+def inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):
+	inspiral_nodes = {}
+	for ifos in segsdict:
+		# FIXME: handles more than 3 ifos with same cpu/memory requests
+		inspiral_name = 'gstlalInspiral%dIFO' % min(len(ifos), 3)
+		inspiral_inj_name = 'gstlalInspiralInj%dIFO' % min(len(ifos), 3)
+
+		# setup dictionaries to hold the inspiral nodes
+		inspiral_nodes[(ifos, None)] = {}
+		ignore = {}
+		injection_files = []
+		for injections in options.injections:
+			min_chirp_mass, max_chirp_mass, injections = injections.split(':')
+			injection_files.append(injections)
+			min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass)
+			inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
+			ignore[injections] = []
+			for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)):
+				if max_chirp_mass <= bounds[0]:
+					ignore[injections].append(int(bgbin_index))
+					# NOTE putting a break here assumes that the min chirp mass
+					# in a subbank increases with bin number, i.e. XXXX+1 has a
+					# greater minimum chirpmass than XXXX, for all XXXX. Note
+					# that the reverse is not true, bin XXXX+1 may have a lower
+					# max chirpmass than bin XXXX.
+				elif min_chirp_mass > bounds[1]:
+					ignore[injections].append(int(bgbin_index))
+
+		# FIXME choose better splitting?
+		numchunks = 50
+
+		# only use a channel dict with the relevant channels
+		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
+
+		# get the svd bank strings
+		svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys())
+
+		# get a mapping between chunk counter and bgbin for setting priorities
+		bgbin_chunk_map = {}
+
+		for seg in segsdict[ifos]:
+			if injection_files:
+				output_seg_inj_path = subdir_path([jobs[inspiral_inj_name].output_path, str(int(seg[0]))[:5]])
+
+			if jobs[inspiral_name] is None:
+				# injection-only run
+				inspiral_nodes[(ifos, None)].setdefault(seg, [None])
+
+			else:
+				output_seg_path = subdir_path([jobs[inspiral_name].output_path, str(int(seg[0]))[:5]])
+				for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)):
+					bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
+					# setup output names
+					output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)]
+					output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
+					dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
+
+					for bgbin in bgbin_indices:
+						bgbin_chunk_map.setdefault(bgbin, chunk_counter)
+
+					# Calculate the appropriate ht-gate-threshold values according to the scale given
+					threshold_values = get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options)
+
+					# non injection node
+					noninjnode = dagparts.DAGNode(jobs[inspiral_name], dag,
+						parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
+						opts = {
+							"psd-fft-length":options.psd_fft_length,
+							"ht-gate-threshold":threshold_values,
+							"frame-segments-name":options.frame_segments_name,
+							"gps-start-time":int(seg[0]),
+							"gps-end-time":int(seg[1]),
+							"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
+							"tmp-space":dagparts.condor_scratch_space(),
+							"track-psd":"",
+							"control-peak-time":options.control_peak_time,
+							"coincidence-threshold":options.coincidence_threshold,
+							"singles-threshold":options.singles_threshold,
+							"fir-stride":options.fir_stride,
+							"data-source":"frames",
+							"local-frame-caching":"",
+							"min-instruments":options.min_instruments,
+							"reference-likelihood-file":options.reference_likelihood_file
+						},
+						input_files = {
+							"time-slide-file":options.time_slide_file,
+							"frame-cache":options.frame_cache,
+							"frame-segments-file":options.frame_segments_file,
+							"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
+							"blind-injections":options.blind_injections,
+							"veto-segments-file":options.vetoes,
+						},
+						input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)},
+						output_cache_files = {
+							"output-cache":output_names,
+							"ranking-stat-output-cache":dist_stat_names
+						}
+					)
+
+					# Set a post script to check for file integrity
+					if options.gzip_test:
+						noninjnode.set_post_script("gzip_test.sh")
+						noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names))
+
+					# impose a priority to help with depth first submission
+					noninjnode.set_priority(chunk_counter+15)
+
+					inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)
+
+			# process injections
+			for injections in injection_files:
+				# setup output names
+				sim_name = sim_tag_from_inj_file(injections)
+
+				bgbin_svd_bank_strings = [bgbin_svdbank for i, bgbin_svdbank in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]]
+
+				for chunk_counter, bgbin_list in enumerate(dagparts.groups(bgbin_svd_bank_strings, numchunks)):
+					bgbin_indices, svd_bank_strings = zip(*bgbin_list)
+					output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices]
+					output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (idx, sim_name), seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
+					svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))]
+					try:
+						reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"]
+						parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices]
+					except AttributeError: ### injection-only run
+						reference_psd = psd_nodes[(ifos, seg)]
+						parents = []
+
+					svd_files = [CacheEntry.from_T050017(filename) for filename in svd_names]
+					input_cache_name = dagparts.group_T050017_filename_from_T050017_files(svd_files, '.cache').replace('SVD', 'SVD_%s' % sim_name)
+
+					# Calculate the appropriate ht-gate-threshold values according to the scale given
+					threshold_values = get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options)
+
+					# setup injection node
+					# FIXME: handles more than 3 ifos with same cpu/memory requests
+					injnode = dagparts.DAGNode(jobs[inspiral_inj_name], dag,
+						parent_nodes = parents,
+						opts = {
+							"psd-fft-length":options.psd_fft_length,
+							"ht-gate-threshold":threshold_values,
+							"frame-segments-name":options.frame_segments_name,
+							"gps-start-time":int(seg[0]),
+							"gps-end-time":int(seg[1]),
+							"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
+							"tmp-space":dagparts.condor_scratch_space(),
+							"track-psd":"",
+							"control-peak-time":options.control_peak_time,
+							"coincidence-threshold":options.coincidence_threshold,
+							"singles-threshold":options.singles_threshold,
+							"fir-stride":options.fir_stride,
+							"data-source":"frames",
+							"local-frame-caching":"",
+							"min-instruments":options.min_instruments,
+							"reference-likelihood-file":options.reference_likelihood_file
+						},
+						input_files = {
+							"time-slide-file":options.inj_time_slide_file,
+							"frame-cache":options.frame_cache,
+							"frame-segments-file":options.frame_segments_file,
+							"reference-psd":reference_psd,
+							"veto-segments-file":options.vetoes,
+							"injections": injections
+						},
+						input_cache_files = {"svd-bank-cache":svd_names},
+						input_cache_file_name = input_cache_name,
+						output_cache_files = {"output-cache":output_names}
+					)
+					# Set a post script to check for file integrity
+					if options.gzip_test:
+						injnode.set_post_script("gzip_test.sh")
+						injnode.add_post_script_arg(" ".join(output_names))
+
+					# impose a priority to help with depth first submission
+					if bgbin_chunk_map:
+						injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1)
+					else:
+						injnode.set_priority(chunk_counter+1)
+
+					inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)
+
+	# Replace mchirplo:mchirphi:inj.xml with inj.xml
+	options.injections = [inj.split(':')[-1] for inj in options.injections]
+
+	# NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
+	lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)
+
+	return inspiral_nodes, lloid_output, lloid_diststats
+
+
+def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs):
+	ligolw_add_nodes = []
+	for inj in options.injections:
+		inj_snr_nodes = []
+
+		inj_splitter_node = dagparts.DAGNode(jobs['injSplitter'], dag, parent_nodes=[],
+			opts = {
+				"output-path":jobs['injSplitter'].output_path,
+				"usertag": sim_tag_from_inj_file(inj.split(":")[-1]),
+				"nsplit": num_split_inj_snr_jobs
+			},
+			input_files = {"": inj.split(":")[-1]}
+		)
+		inj_splitter_node.set_priority(98)
+
+		# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
+		injection_files = ["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)]
+		for injection_file in injection_files:
+			injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node],
+				# FIXME somehow choose the actual flow based on mass?
+				# max(flow) is chosen for performance not
+				# correctness hopefully though it will be good
+				# enough
+				opts = {"flow":max(options.flow),"fmax":options.fmax},
+				input_files = {
+					"injection-file": injection_file,
+					"reference-psd-cache": "reference_psd.cache"
+				}
+			)
+			injSNRnode.set_priority(98)
+			inj_snr_nodes.append(injSNRnode)
+
+		addnode = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes=inj_snr_nodes,
+			input_files = {"": ' '.join(injection_files)},
+			output_files = {"output": inj.split(":")[-1]}
+		)
+
+		ligolw_add_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [addnode],
+			opts = {"sql-file":options.injection_proc_sql_file, "tmp-space":dagparts.condor_scratch_space()},
+			input_files = {"":addnode.output_files["output"]}
+			)
+		)
+	return ligolw_add_nodes
+
+
+def summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb, output_dir):
+	plotnodes = []
+
+	### common plot options
+	common_plot_opts = {
+		"segments-name": options.frame_segments_name,
+		"tmp-space": dagparts.condor_scratch_space(),
+		"output-dir": output_dir,
+		"likelihood-file":"post_marginalized_likelihood.xml.gz",
+		"shrink-data-segments": 32.0,
+		"extend-veto-segments": 8.,
+	}
+	sensitivity_opts = {
+		"output-dir":output_dir,
+		"tmp-space":dagparts.condor_scratch_space(),
+		"veto-segments-name":"vetoes",
+		"bin-by-source-type":"",
+		"dist-bins":200,
+		"data-segments-name":"datasegments"
+	}
+
+	### plot summary
+	opts = {"user-tag": "ALL_LLOID_COMBINED", "remove-precession": ""}
+	opts.update(common_plot_opts)
+	plotnodes.append(dagparts.DAGNode(jobs['plotSummary'], dag, parent_nodes=[farnode],
+		opts = opts,
+		input_files = {"": [noninjdb] + injdbs}
+	))
+
+	### isolated precession plot summary
+	opts = {"user-tag": "PRECESSION_LLOID_COMBINED", "isolate-precession": "", "plot-group": 1}
+	opts.update(common_plot_opts)
+	plotnodes.append(dagparts.DAGNode(jobs['plotSummaryIsolatePrecession'], dag, parent_nodes=[farnode],
+		opts = opts,
+		input_files = {"":[noninjdb] + injdbs}
+	))
+
+	for injdb in injdbs:
+		### individual injection plot summary
+		opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1], "remove-precession": "", "plot-group": 1}
+		opts.update(common_plot_opts)
+		plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummary'], dag, parent_nodes=[farnode],
+			opts = opts,
+			input_files = {"":[noninjdb] + [injdb]}
+		))
+
+		### isolated precession injection plot summary
+		opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "isolate-precession": "", "plot-group": 1}
+		opts.update(common_plot_opts)
+		plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummaryIsolatePrecession'], dag, parent_nodes=[farnode],
+			opts = opts,
+			input_files = {"":[noninjdb] + [injdb]}
+		))
+
+	### sensitivity plots
+	opts = {"user-tag": "ALL_LLOID_COMBINED"}
+	opts.update(sensitivity_opts)
+	plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode],
+		opts = opts,
+		input_files = {"zero-lag-database": noninjdb, "": injdbs}
+	))
+
+	for injdb in injdbs:
+		opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]}
+		opts.update(sensitivity_opts)
+		plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode],
+			opts = opts,
+			input_files = {"zero-lag-database": noninjdb, "": injdb}
+		))
+
+	### background plots
+	plotnodes.append(dagparts.DAGNode(jobs['plotBackground'], dag, parent_nodes = [farnode],
+		opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir},
+		input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb}
+	))
+
+	return plotnodes
+
+
+def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete):
+	"""clean intermediate merger products
+	"""
+	for db in dbs_to_delete:
+		dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
+			input_files = {"": db}
+		)
+
+	for margfile in margfiles_to_delete:
+		dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
+			input_files = {"": margfile}
+		)
+	return None
+
+
+def inj_psd_layer(segsdict, options):
+	psd_nodes = {}
+	psd_cache_files = {}
+	for ce in map(CacheEntry, open(options.psd_cache)):
+		psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path))
+	for ifos in segsdict:
+		reference_psd_files = sorted(psd_cache_files[ifos], key = lambda (s, p): s)
+		ref_psd_file_num = 0
+		for seg in segsdict[ifos]:
+			while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]):
+				ref_psd_file_num += 1
+			psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1]
+	ref_psd_parent_nodes = []
+	return psd_nodes, ref_psd_parent_nodes
+
+
+def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd):
+	"""mass model node
+	"""
+	if options.mass_model_file is None:
+		# choose, arbitrarily, the lowest instrument in alphabetical order
+		model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = jobs['model'].output_path)
+		model_node = dagparts.DAGNode(jobs['model'], dag,
+			input_files = {"template-bank": options.template_bank, "reference-psd": psd},
+			opts = {"model":options.mass_model},
+			output_files = {"output": model_file_name},
+			parent_nodes = parent_nodes
+		)
+		return [model_node], model_file_name
+	else:
+		return [], options.mass_model_file
+
+
+def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None):
+	"""merge and cluster from sqlite database
+	"""
+	if input_files:
+		input_ = {"": input_files}
+	else:
+		input_ = {}
+
+	# Merge database into chunks
+	sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
+		opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
+		input_files = input_,
+		input_cache_files = {"input-cache": db_cache},
+		output_files = {"database":db},
+		input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
+	)
+
+	# cluster database
+	return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
+		opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()},
+		input_files = {"": db}
+	)
+
+
+def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map):
+	instruments = "".join(sorted(instrument_set))
+	margnodes = {}
+
+	# NOTE! we rely on there being identical templates in each instrument,
+	# so we just take one of the values of the svd_nodes which are a dictionary
+	# FIXME, the svd nodes list has to be the same as the sorted keys of
+	# lloid_output.  svd nodes should be made into a dictionary much
+	# earlier in the code to prevent a mishap
+	one_ifo_svd_nodes = dict(("%04d" % n, node) for n, node in enumerate( svd_nodes.values()[0]))
+
+	# Here n counts the bins
+	# FIXME - this is broken for injection dags right now because of marg nodes
+	# first non-injections, which will get skipped if this is an injections-only run
+	for bin_key in sorted(lloid_output[None].keys()):
+		outputs = lloid_output[None][bin_key]
+		diststats = lloid_diststats[bin_key]
+		inputs = [o[0] for o in outputs]
+		parents = dagparts.flatten([o[1] for o in outputs])
+		rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key)
+
+		# FIXME we keep this here in case we someday want to have a
+		# mass bin dependent prior, but it really doesn't matter for
+		# the time being.
+		priornode = dagparts.DAGNode(jobs['createPriorDistStats'], dag,
+			parent_nodes = [one_ifo_svd_nodes[bin_key]] + model_node or [],
+			opts = {
+				"instrument": instrument_set,
+				"background-prior": 1,
+				"min-instruments": options.min_instruments,
+				"coincidence-threshold":options.coincidence_threshold,
+				"df": "bandwidth"
+			},
+			input_files = {
+				"svd-file": one_ifo_svd_nodes[bin_key].output_files["write-svd"],
+				"mass-model-file": model_file,
+				"dtdphi-file": svd_dtdphi_map[bin_key],
+				"psd-xml": ref_psd
+			},
+			output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])}
+		)
+		# Create a file that has the priors *and* all of the diststats
+		# for a given bin marginalized over time. This is all that will
+		# be needed to compute the likelihood
+		diststats_per_bin_node = dagparts.DAGNode(jobs['marginalize'], dag,
+			parent_nodes = [priornode] + parents,
+			opts = {"marginalize": "ranking-stat"},
+			input_cache_files = {"likelihood-cache": diststats + [priornode.output_files["write-likelihood"]]},
+			output_files = {"output": rankfile('MARG_DIST_STATS', job=jobs['marginalize'])},
+			input_cache_file_name = rankfile('MARG_DIST_STATS')
+		)
+
+		margnodes[bin_key] = diststats_per_bin_node
+
+	return margnodes
+
+
+def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set):
+	rankpdf_nodes = []
+	rankpdf_zerolag_nodes = []
+	instruments = "".join(sorted(instrument_set))
+
+	# Here n counts the bins
+	for bin_key in sorted(marg_nodes.keys()):
+		rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key)
+
+		calcranknode = dagparts.DAGNode(jobs['calcRankPDFs'], dag,
+			parent_nodes = [marg_nodes[bin_key]],
+			opts = {"ranking-stat-samples":options.ranking_stat_samples},
+			input_files = {"": marg_nodes[bin_key].output_files["output"]},
+			output_files = {"output": rankfile('CALC_RANK_PDFS', job=jobs['calcRankPDFs'])},
+		)
+
+		calcrankzerolagnode = dagparts.DAGNode(jobs['calcRankPDFsWithZerolag'], dag,
+			parent_nodes = [marg_nodes[bin_key]],
+			opts = {"add-zerolag-to-background": "", "ranking-stat-samples": options.ranking_stat_samples},
+			input_files = {"": marg_nodes[bin_key].output_files["output"]},
+			output_files = {"output": rankfile('CALC_RANK_PDFS_WZL', job=jobs['calcRankPDFsWithZerolag'])},
+		)
+
+		rankpdf_nodes.append(calcranknode)
+		rankpdf_zerolag_nodes.append(calcrankzerolagnode)
+
+	return rankpdf_nodes, rankpdf_zerolag_nodes
+
+
+def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set):
+	likelihood_nodes = {}
+	instruments = "".join(sorted(instrument_set))
+
+	# non-injection jobs
+	for bin_key in sorted(lloid_output[None].keys()):
+		outputs = lloid_output[None][bin_key]
+		diststats = lloid_diststats[bin_key]
+		inputs = [o[0] for o in outputs]
+
+		# (input files for next job, dist stat files, parents for next job)
+		likelihood_nodes[None, bin_key] = (inputs, marg_nodes[bin_key].output_files["output"], [marg_nodes[bin_key]])
+
+	# injection jobs
+	for inj in options.injections:
+		lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)]
+		for bin_key in sorted(lloid_nodes.keys()):
+			outputs = lloid_nodes[bin_key]
+			diststats = lloid_diststats[bin_key]
+			if outputs is not None:
+				inputs = [o[0] for o in outputs]
+				parents = dagparts.flatten([o[1] for o in outputs])
+
+				parents.append(marg_nodes[bin_key])
+				likelihood_url = marg_nodes[bin_key].output_files["output"]
+				likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents)
+
+	return likelihood_nodes
+
+
+def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, boundary_seg, instruments):
+	num_chunks = 100
+	innodes = {}
+
+	# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
+	for (sim_tag, bin_key), (inputs, likelihood_url, parents) in sorted(likelihood_nodes.items()):
+		db = inputs_to_db(jobs, inputs, job_type = 'toSqliteNoCache')
+		xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz")
+		snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file
+		cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file
+
+		# cluster sub banks
+		cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents,
+			opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
+			input_files = {"":inputs}
+			)
+
+		# merge sub banks
+		merge_node = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes = [cluster_node],
+			input_files = {"":inputs},
+			output_files = {"output":xml}
+			)
+
+		# cluster and simplify sub banks
+		cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [merge_node],
+			opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
+			input_files = {"":xml}
+			)
+
+		# assign likelihoods
+		likelihood_node = dagparts.DAGNode(jobs['calcLikelihood'], dag,
+			parent_nodes = [cluster_node],
+			opts = {"tmp-space":dagparts.condor_scratch_space()},
+			input_files = {"likelihood-url":likelihood_url, "": xml}
+			)
+
+		sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [likelihood_node],
+			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
+			input_files = {"":xml},
+			output_files = {"database":db},
+		)
+		sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
+			opts = {"sql-file": cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
+			input_files = {"":db}
+		)
+
+		innodes.setdefault(sim_tag_from_inj_file(sim_tag) if sim_tag else None, []).append(sqlitenode)
+
+	# make sure outnodes has a None key, even if its value is an empty list
+	# FIXME injection dag is broken
+	innodes.setdefault(None, [])
+
+	if options.vetoes is None:
+		vetoes = []
+	else:
+		vetoes = [options.vetoes]
+
+	chunk_nodes = []
+	dbs_to_delete = []
+	# Process the chirp mass bins in chunks to paralellize the merging process
+	for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
+		try:
+			dbs = [node.input_files[""] for node in nodes]
+			parents = nodes
+
+		except AttributeError:
+			# analysis started at merger step but seeded by lloid files which
+			# have already been merged into one file per background
+			# bin, thus the analysis will begin at this point
+			dbs = nodes
+			parents = []
+
+		dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
+		noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
+
+		# Merge and cluster the final non injection database
+		noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file)
+		chunk_nodes.append(noninjsqlitenode)
+		dbs_to_delete.append(noninjdb)
+
+	# Merge the final non injection database
+	outnodes = []
+	injdbs = []
+	if options.non_injection_db: #### injection-only run
+		noninjdb = options.non_injection_db
+	else:
+		final_nodes = []
+		for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
+			noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
+
+			# cluster the final non injection database
+			noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file)
+			final_nodes.append(noninjsqlitenode)
+
+		input_files = (vetoes + [options.frame_segments_file])
+		input_cache_files = [node.input_files[""] for node in final_nodes]
+		noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
+		noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
+
+		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode],
+			input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
+		)
+
+		outnodes.append(cpnode)
+
+	if options.injections:
+		iterable_injections = options.injections
+	else:
+		iterable_injections = options.injections_for_merger
+
+	for injections in iterable_injections:
+		# extract only the nodes that were used for injections
+		chunk_nodes = []
+
+		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
+			try:
+				dbs = [injnode.input_files[""] for injnode in injnodes]
+				parents = injnodes
+			except AttributeError:
+				dbs = injnodes
+				parents = []
+
+			# Setup the final output names, etc.
+			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
+			injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
+
+			# merge and cluster
+			clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file)
+			chunk_nodes.append(clusternode)
+			dbs_to_delete.append(injdb)
+
+
+		final_nodes = []
+		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
+			# Setup the final output names, etc.
+			injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
+
+			# merge and cluster
+			clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.cluster_sql_file)
+			final_nodes.append(clusternode)
+
+		# Setup the final output names, etc.
+		injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
+		injdbs.append(injdb)
+		injxml = injdb.replace('.sqlite','.xml.gz')
+
+		xml_input = injxml
+
+		# merge and cluster
+		parent_nodes = final_nodes + ligolw_add_nodes
+		input_files = (vetoes + [options.frame_segments_file, injections])
+		input_cache_files = [node.input_files[""] for node in final_nodes]
+		clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
+
+		clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode],
+			opts = {"tmp-space":dagparts.condor_scratch_space()},
+			output_files = {"extract":injxml},
+			input_files = {"database":injdb}
+		)
+
+		inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode],
+			opts = {"time-window":0.9},
+			input_files = {"":injxml}
+		)
+
+		sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode],
+			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
+			output_files = {"database":injdb},
+			input_files = {"":xml_input}
+		)
+
+		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode],
+			input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
+		)
+
+		outnodes.append(cpnode)
+
+	return injdbs, noninjdb, outnodes, dbs_to_delete
+
+
+def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, options):
+	ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes]
+	margjobs = [jobs['marginalize'], jobs['marginalizeWithZerolag']]
+	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
+	filesuffixs = ['', '_with_zerolag']
+
+	margnum = 16
+	all_margcache = []
+	all_margnodes = []
+	final_margnodes = []
+	for nodes, job, margfile, filesuffix in zip(ranknodes, margjobs, margfiles, filesuffixs):
+		try:
+			margin = [node.output_files["output"] for node in nodes]
+			parents = nodes
+		except AttributeError: ### analysis started at merger step
+			margin = nodes
+			parents = []
+
+		margnodes = []
+		margcache = []
+
+		# split up the marginalization into groups of 10
+		# FIXME: is it actually groups of 10 or groups of 16?
+		for margchunk in dagparts.groups(margin, margnum):
+			if nodes:
+				marg_ce = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margchunk]
+				margcache.append(dagparts.group_T050017_filename_from_T050017_files(marg_ce, '.xml.gz', path = job.output_path))
+				margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = parents,
+					opts = {"marginalize": "ranking-stat-pdf"},
+					output_files = {"output": margcache[-1]},
+					input_cache_files = {"likelihood-cache": margchunk},
+					input_cache_file_name = os.path.basename(margcache[-1]).replace('.xml.gz','.cache')
+				))
+
+		all_margcache.append(margcache)
+		all_margnodes.append(margnodes)
+
+	if not options.marginalized_likelihood_file: ### not an injection-only run
+		for nodes, job, margnodes, margcache, margfile, filesuffix in zip(ranknodes, margjobs, all_margnodes, all_margcache, margfiles, filesuffixs):
+			final_margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = margnodes,
+				opts = {"marginalize": "ranking-stat-pdf"},
+				output_files = {"output": "marginalized_likelihood%s.xml.gz"%filesuffix},
+				input_cache_files = {"likelihood-cache": margcache},
+				input_cache_file_name = "marginalized_likelihood%s.cache"%filesuffix
+			))
+
+	return final_margnodes, dagparts.flatten(all_margcache)
+
+
+def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes, options):
+	"""compute FAPs and FARs
+	"""
+	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
+	filesuffixs = ['', '_with_zerolag']
+
+	for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs):
+		if options.marginalized_likelihood_file: ### injection-only run
+			parents = final_sqlite_nodes
+			marginalized_likelihood_file = margfile
+
+		else:
+			parents = [margnode] + final_sqlite_nodes
+			marginalized_likelihood_file = margnode.output_files["output"]
+
+		farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents,
+			opts = {"tmp-space":dagparts.condor_scratch_space()},
+			input_files = {
+				"background-bins-file": marginalized_likelihood_file,
+				"injection-db": [injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs] if 'zerolag' in filesuffix else injdbs,
+				"non-injection-db": noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') if 'zerolag' in filesuffix else noninjdb
+			}
+		)
+
+		if 'zerolag' not in filesuffix:
+			outnode = farnode
+
+	return outnode
+
+
+def horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir, instruments):
+	"""calculate horizon distance
+	"""
+	dagparts.DAGNode(jobs['horizon'], dag,
+		parent_nodes = psd_nodes.values(),
+		input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]},
+		output_files = {"":dagparts.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)}
+	)
+
+
+def summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs, output_dir):
+	"""create a summary page
+	"""
+	output_user_tags = ["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"]
+	output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs])
+	output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs])
+
+	dagparts.DAGNode(jobs['summaryPage'], dag, parent_nodes = plotnodes,
+		opts = {
+			"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])),
+			"webserver-dir":options.web_dir,
+			"glob-path":output_dir,
+			"output-user-tag":output_user_tags
+		}
+	)
 
 
 #
@@ -68,6 +1018,47 @@ def webserver_url():
 	raise NotImplementedError("I don't know where the webserver is for this environment")
 
 
+#
+# DAG utilities
+#
+
+
+def get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options):
+	"""Calculate the appropriate ht-gate-threshold values according to the scale given
+	"""
+	if options.ht_gate_threshold_linear is not None:
+		# A scale is given
+		mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
+		# use max mchirp in a given svd bank to decide gate threshold
+		bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
+		gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)
+		return [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
+	elif options.ht_gate_threshold is not None:
+		return [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given
+	else:
+		return None
+
+
+def inputs_to_db(jobs, inputs, job_type = 'toSqlite'):
+	dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
+	db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
+	return os.path.join(subdir_path([jobs[job_type].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
+
+
+def cache_to_db(cache, jobs):
+	hi_index = cache[-1].description.split('_')[0]
+	db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path))
+	db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
+	return db
+
+
+def get_rank_file(instruments, boundary_seg, n, basename, job=None):
+	if job:
+		return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path)
+	else:
+		return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.cache')
+
+
 #
 # Utility functions
 #
@@ -176,6 +1167,156 @@ def get_svd_bank_params(svd_bank_cache, online = False):
 		return template_mchirp_dict
 
 
+def sim_tag_from_inj_file(injections):
+	if injections is None:
+		return None
+	return injections.replace('.xml', '').replace('.gz', '').replace('-','_')
+
+
+def get_bank_params(options, verbose = False):
+	bank_cache = {}
+	for bank_cache_str in options.bank_cache:
+		for c in bank_cache_str.split(','):
+			ifo = c.split("=")[0]
+			cache = c.replace(ifo+"=","")
+			bank_cache.setdefault(ifo, []).append(cache)
+
+	max_time = 0
+	template_mchirp_dict = {}
+	for n, cache in enumerate(bank_cache.values()[0]):
+		for ce in map(CacheEntry, open(cache)):
+			for ce in map(CacheEntry, open(ce.path)):
+				xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler)
+				snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc)
+				max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration')))
+				idx = options.overlap[n]/2
+				template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[idx:-idx]), max(snglinspiraltable.getColumnByName('mchirp')[idx:-idx])]
+				xmldoc.unlink()
+
+	return template_mchirp_dict, bank_cache, max_time
+
+
+def subdir_path(dirlist):
+	output_path = '/'.join(dirlist)
+	try:
+		os.mkdir(output_path)
+	except:
+		pass
+	return output_path
+
+
+def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
+	"""get a dictionary of all the disjoint 2+ detector combination segments
+	"""
+	segsdict = segments.segmentlistdict()
+	# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
+	start_pad = 512 + max_template_length
+	# Chosen so that the overlap is only a ~5% hit in run time for long segments...
+	segment_length = int(5 * start_pad)
+	for n in range(min_instruments, 1 + len(analyzable_instruments_set)):
+		for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n):
+			segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
+			segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
+			segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad)
+			segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad)
+			if not segsdict[frozenset(ifo_combos)]:
+				del segsdict[frozenset(ifo_combos)]
+	return segsdict
+
+
+def create_svd_bank_strings(svd_nodes, instruments = None):
+	# FIXME assume that the number of svd nodes is the same per ifo, a good assumption though
+	outstrings = []
+	for i in range(len(svd_nodes.values()[0])):
+		svd_bank_string = ""
+		for ifo in svd_nodes:
+			if instruments is not None and ifo not in instruments:
+				continue
+			try:
+				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"])
+			except AttributeError:
+				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i])
+		svd_bank_string = svd_bank_string.strip(",")
+		outstrings.append(svd_bank_string)
+	return outstrings
+
+
+def svd_bank_cache_maker(svd_bank_strings, injection = False):
+	if injection:
+		dir_name = "gstlal_inspiral_inj"
+	else:
+		dir_name = "gstlal_inspiral"
+	svd_cache_entries = []
+	parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
+	for svd_bank_parsed_dict in parsed_svd_bank_strings:
+		for filename in svd_bank_parsed_dict.itervalues():
+			svd_cache_entries.append(CacheEntry.from_T050017(filename))
+
+	return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries]
+
+
+def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
+	# first get the previous output in a usable form
+	lloid_output = {}
+	for inj in options.injections + [None]:
+		lloid_output[sim_tag_from_inj_file(inj)] = {}
+	lloid_diststats = {}
+	if options.dist_stats_cache:
+		for ce in map(CacheEntry, open(options.dist_stats_cache)):
+			lloid_diststats[ce.description.split("_")[0]] = [ce.path]
+	for ifos in segsdict:
+		for seg in segsdict[ifos]:
+			# iterate over the mass space chunks for each segment
+			for node in inspiral_nodes[(ifos, None)][seg]:
+				if node is None:
+					break
+				len_out_files = len(node.output_files["output-cache"])
+				for f in node.output_files["output-cache"]:
+					# Store the output files and the node for use as a parent dependency
+					lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node]))
+				for f in node.output_files["ranking-stat-output-cache"]:
+					lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f)
+			for inj in options.injections:
+				for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
+					if injnode is None:
+						continue
+					for f in injnode.output_files["output-cache"]:
+						# Store the output files and the node and injnode for use as a parent dependencies
+						bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0]
+						try:
+							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode]))
+						except KeyError:
+							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode]))
+
+	return lloid_output, lloid_diststats
+
+
+def set_up_scripts(options):
+	# Make an xml integrity checker
+	if options.gzip_test:
+		with open("gzip_test.sh", "w") as f:
+			f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
+		os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
+
+	# A pre script to backup data before feeding to lossy programs
+	# (e.g. clustering routines)
+	with open("store_raw.sh", "w") as f:
+		f.write("""#!/bin/bash
+		for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
+	os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
+
+
+def load_reference_psd(options):
+	ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
+
+	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
+	with open('reference_psd.cache', "w") as output_cache_file:
+		output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd)))
+
+	return ref_psd
+
+
+
 if __name__ == "__main__":
 	import doctest
 	doctest.testmod()
-- 
GitLab