Skip to content
Snippets Groups Projects
Forked from lscsoft / GstLAL
2635 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_inspiral_pipe 80.88 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011-2014 Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


### The offline gstlal inspiral workflow generator; Use to make HTCondor DAGs to run CBC workflows
###
### Usage:
### ------
###
### It is rare that you would invoke this program in a standalone mode. Usually
### the inputs are complicated and best automated via a Makefile, e.g.,
### Makefile.triggers_example
###
### Diagram of the HTCondor workfow produced
### ----------------------------------------
###
### .. graphviz:: ../images/trigger_pipe.dot
### 


"""
This program makes a dag to run gstlal_inspiral offline
"""

__author__ = 'Chad Hanna <chad.hanna@ligo.org>'

##############################################################################
# import standard modules and append the lalapps prefix to the python path
import sys, os, stat
import itertools
from optparse import OptionParser

##############################################################################
# import the modules we need to build the pipeline
import lal
import lal.series
from lal.utils import CacheEntry
from glue import pipeline
from ligo import segments
from glue.ligolw import ligolw
from glue.ligolw import lsctables
import glue.ligolw.utils as ligolw_utils
import glue.ligolw.utils.segments as ligolw_segments
from gstlal import inspiral, inspiral_pipe
from gstlal import dagparts as gstlaldagparts
from gstlal import datasource

class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
	pass
lsctables.use_in(LIGOLWContentHandler)


#
# Utility functions
#


def sim_tag_from_inj_file(injections):
	if injections is None:
		return None
	return injections.replace('.xml', '').replace('.gz', '').replace('-','_')

def get_bank_params(bank_cache, options, verbose = False):
	max_time = 0
	template_mchirp_dict = {}
	for n, cache in enumerate(bank_cache.values()[0]):
		for ce in map(CacheEntry, open(cache)):
			for ce in map(CacheEntry, open(ce.path)):
				xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler)
				snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc)
				max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration')))
				template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[options.overlap[n]/2:-options.overlap[n]/2]), max(snglinspiraltable.getColumnByName('mchirp')[options.overlap[n]/2:-options.overlap[n]/2])]
				xmldoc.unlink()

	return max_time, template_mchirp_dict

def chunks(l, n):
	for i in xrange(0, len(l), n):
		yield l[i:i+n]

def flatten(lst):
    "Flatten one level of nesting"
    return list(itertools.chain.from_iterable(lst))

def subdir_path(dirlist):
	output_path = '/'.join(dirlist)
	try:
		os.mkdir(output_path)
	except:
		pass
	return output_path

#
# get a dictionary of all the disjoint 2+ detector combination segments
#

def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
	segsdict = segments.segmentlistdict()
	# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
	start_pad = 512 + max_template_length
	# Chosen so that the overlap is only a ~5% hit in run time for long segments...
	segment_length = int(5 * start_pad)
	for n in range(min_instruments, 1 + len(analyzable_instruments_set)):
		for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n):
			# never analyze H1H2 or H2L1 times
			#if set(ifo_combos) == set(('H1', 'H2')) or set(ifo_combos) == set(('L1', 'H2')):
			#	print >> sys.stderr, "not analyzing: ", ifo_combos, " only time"
			#	continue
			segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
			segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
			segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad)
			segsdict[frozenset(ifo_combos)] = gstlaldagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad)
			if not segsdict[frozenset(ifo_combos)]:
				del segsdict[frozenset(ifo_combos)]
	return segsdict

def psd_node_gen(refPSDJob, dag, parent_nodes, segsdict, channel_dict, options):
	psd_nodes = {}
	for ifos in segsdict:
		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
		for seg in segsdict[ifos]:
			psd_nodes[(ifos, seg)] = \
				inspiral_pipe.generic_node(refPSDJob, dag, parent_nodes = parent_nodes,
					opts = {"gps-start-time":int(seg[0]),
						"gps-end-time":int(seg[1]),
						"data-source":"frames",
						"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
						"psd-fft-length":options.psd_fft_length,
						"frame-segments-name": options.frame_segments_name},
					input_files = {	"frame-cache":options.frame_cache,
							"frame-segments-file":options.frame_segments_file},
					output_files = {"write-psd":inspiral_pipe.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = subdir_path([refPSDJob.output_path, str(int(seg[0]))[:5]]))}
				)
	return psd_nodes

def inj_psd_node_gen(segsdict, options):
	psd_nodes = {}
	psd_cache_files = {}
	for ce in map(CacheEntry, open(options.psd_cache)):
		psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path))
	for ifos in segsdict:
		reference_psd_files = sorted(psd_cache_files[ifos], key = lambda (s, p): s)
		ref_psd_file_num = 0
		for seg in segsdict[ifos]:
			while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]):
				ref_psd_file_num += 1
			psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1]
	ref_psd_parent_nodes = []
	return psd_nodes, ref_psd_parent_nodes

def model_node_gen(modelJob, dag, instruments, options, seg, template_bank):
	if options.mass_model_file is None:
		# choose, arbitrarily, the lowest instrument in alphabetical order
		model_file_name = inspiral_pipe.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = modelJob.output_path)
		model_node = inspiral_pipe.generic_node(modelJob, dag,
			input_files = {"template-bank": template_bank},
			opts = {"model":options.mass_model},
			output_files = {"output": model_file_name},
			parent_nodes = []
		)
		return [model_node], model_file_name
	else:
		return [], options.mass_model_file

def svd_node_gen(svdJob, dag, parent_nodes, psd, bank_cache, options, seg, template_mchirp_dict):
	svd_nodes = {}
	new_template_mchirp_dict = {}
	for ifo, list_of_svd_caches in bank_cache.items():
		bin_offset = 0
		for j, svd_caches in enumerate(list_of_svd_caches):
			svd_caches = map(CacheEntry, open(svd_caches))
			for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches):
				# First sort out the clipleft, clipright options
				clipleft = []
				clipright = []
				ids = []
				mchirp_interval = (float("inf"), 0)
				individual_svd_cache = map(CacheEntry, open(individual_svd_cache))
				for n, f in enumerate(ce.path for ce in individual_svd_cache):
					# handle template bank clipping
					clipleft.append(options.overlap[j] / 2)
					clipright.append(options.overlap[j] / 2)
					ids.append("%d_%d" % (i+bin_offset, n))
					if f in template_mchirp_dict:
						mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))

				svd_bank_name = inspiral_pipe.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = svdJob.output_path)
				if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
					new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval

				svdnode = inspiral_pipe.generic_node(svdJob, dag,
					parent_nodes = parent_nodes,
					opts = {"svd-tolerance":options.tolerance,
						"flow":options.flow,
						"clipleft":clipleft,
						"clipright":clipright,
						"samples-min":options.samples_min[j],
						"samples-max-256":options.samples_max_256,
						"samples-max-64":options.samples_max_64,
						"samples-max":options.samples_max,
						"autocorrelation-length":options.autocorrelation_length,
						"bank-id":ids,
						"identity-transform":options.identity_transform,
						"ortho-gate-fap":0.5},
					input_files = {"reference-psd":psd},
					input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]},
					input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"),
					output_files = {"write-svd":svd_bank_name}
					)
				# impose a priority to help with depth first submission
				svdnode.set_priority(99)
				svd_nodes.setdefault(ifo, []).append(svdnode)
			bin_offset += i+1
	#
	# Plot template/svd bank jobs
	#

	primary_ifo = bank_cache.keys()[0]
	inspiral_pipe.generic_node(plotBanksJob, dag,
				parent_nodes = sum(svd_nodes.values(),[]),
				opts = {"plot-template-bank":"",
					"output-dir": output_dir},
				input_files = {"template-bank-file":options.template_bank}
				)

	return svd_nodes, new_template_mchirp_dict

def create_svd_bank_strings(svd_nodes, instruments = None):
	# FIXME assume that the number of svd nodes is the same per ifo, a good assumption though
	outstrings = []
	for i in range(len(svd_nodes.values()[0])):
		svd_bank_string = ""
		for ifo in svd_nodes:
			if instruments is not None and ifo not in instruments:
				continue
			try:
				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"])
			except AttributeError:
				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i])
		svd_bank_string = svd_bank_string.strip(",")
		outstrings.append(svd_bank_string)
	return outstrings

def svd_bank_cache_maker(svd_bank_strings, injection = False):
	if injection:
		dir_name = "gstlal_inspiral_inj"
	else:
		dir_name = "gstlal_inspiral"
	svd_cache_entries = []
	parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
	for svd_bank_parsed_dict in parsed_svd_bank_strings:
		for filename in svd_bank_parsed_dict.itervalues():
			svd_cache_entries.append(CacheEntry.from_T050017(filename))

	return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries] 

def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):

	inspiral_nodes = {}
	for ifos in segsdict:

		# setup dictionaries to hold the inspiral nodes
		inspiral_nodes[(ifos, None)] = {}	
		ignore = {}
		injection_files = []
		for injections in options.injections:
			min_chirp_mass, max_chirp_mass, injections = injections.split(':')
			injection_files.append(injections)
			min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass)
			inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
			ignore[injections] = []
			for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)):
				if max_chirp_mass <= bounds[0]:
					ignore[injections].append(int(bgbin_index))
					# NOTE putting a break here assumes that the min chirp mass
					# in a subbank increases with bin number, i.e. XXXX+1 has a
					# greater minimum chirpmass than XXXX, for all XXXX. Note
					# that the reverse is not true, bin XXXX+1 may have a lower
					# max chirpmass than bin XXXX.
				elif min_chirp_mass > bounds[1]:
					ignore[injections].append(int(bgbin_index))

		# FIXME choose better splitting?
		numchunks = 10

		# only use a channel dict with the relevant channels
		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)

		# get the svd bank strings
		svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys())

		# get a mapping between chunk counter and bgbin for setting priorities
		bgbin_chunk_map = {}

		for seg in segsdict[ifos]:
			if injection_files:
				output_seg_inj_path = subdir_path([gstlalInspiralInjJob.output_path, str(int(seg[0]))[:5]])
			
			if gstlalInspiralJob is None:
				# injection-only run
				inspiral_nodes[(ifos, None)].setdefault(seg, [None])

			else:
				output_seg_path = subdir_path([gstlalInspiralJob.output_path, str(int(seg[0]))[:5]])
				for chunk_counter, svd_bank_strings in enumerate(chunks(svd_bank_strings_full, numchunks)):
					bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
					# setup output names
					output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)]
					output_names = [inspiral_pipe.T050017_filename(ifos, '%s_LLOID' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i, s in enumerate(svd_bank_strings)]
					dist_stat_names = [inspiral_pipe.T050017_filename(ifos, '%s_DIST_STATS' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i,s in enumerate(svd_bank_strings)]

					for bgbin in bgbin_indices:
						bgbin_chunk_map.setdefault(bgbin, chunk_counter)

					# Calculate the appropriate ht-gate-threshold values according to the scale given
					threshold_values = None
					if options.ht_gate_threshold_linear is not None:
						# A scale is given
						mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
						# use max mchirp in a given svd bank to decide gate threshold
						bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
						threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
					else:
						if options.ht_gate_threshold is not None:
							threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given

					# non injection node
					noninjnode = inspiral_pipe.generic_node(gstlalInspiralJob, dag,
							parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
							opts = {"psd-fft-length":options.psd_fft_length,
								"ht-gate-threshold":threshold_values,
								"frame-segments-name":options.frame_segments_name,
								"gps-start-time":int(seg[0]),
								"gps-end-time":int(seg[1]),
								"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
								"tmp-space":inspiral_pipe.condor_scratch_space(),
								"track-psd":"",
								"control-peak-time":options.control_peak_time,
								"coincidence-threshold":options.coincidence_threshold,
								"singles-threshold":options.singles_threshold,
								"fir-stride":options.fir_stride,
								"data-source":"frames",
								"local-frame-caching":"",
								"min-instruments":options.min_instruments,
								"reference-likelihood-file":options.reference_likelihood_file
								},
							input_files = {	"time-slide-file":options.time_slide_file,
									"frame-cache":options.frame_cache,
									"frame-segments-file":options.frame_segments_file,
									"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
									"blind-injections":options.blind_injections,
									"veto-segments-file":options.vetoes,
								},
							input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)},
							output_cache_files = {
									"output-cache":output_names,
									"ranking-stat-output-cache":dist_stat_names
								}
							)
					# Set a post script to check for file integrity
					if options.gzip_test:
						noninjnode.set_post_script("gzip_test.sh")
						noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names))
					# impose a priority to help with depth first submission
					noninjnode.set_priority(chunk_counter+15)
					inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)

			# process injections
			for injections in injection_files:
				# setup output names
				sim_name = sim_tag_from_inj_file(injections)

				bgbin_svd_bank_strings = [index_bank_string_tuple for i, index_bank_string_tuple in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]]

				for chunk_counter, bgbin_list in enumerate(chunks(bgbin_svd_bank_strings, numchunks)):
					bgbin_indices, svd_bank_strings = zip(*bgbin_list)
					output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices]
					output_names = [inspiral_pipe.T050017_filename(ifos, '%s_LLOID_%s' % (bgbin_index, sim_name), seg, '.xml.gz', path = output_paths[i]) for i, bgbin_index in enumerate(bgbin_indices)]
					svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))]
					try:
						reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"]
						parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices]
					except AttributeError:
						# injection-only run
						reference_psd = psd_nodes[(ifos, seg)]
						parents = []

					# Calculate the appropriate ht-gate-threshold values according to the scale given
					threshold_values = None
					if options.ht_gate_threshold_linear is not None:
						# A scale is given
						mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
						# use max mchirp in a given svd bank to decide gate threshold
						bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
						threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
					else:
						if options.ht_gate_threshold is not None:
							threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given

					# setup injection node
					injnode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, parent_nodes = parents,
							opts = {"psd-fft-length":options.psd_fft_length,
								"ht-gate-threshold":threshold_values,
								"frame-segments-name":options.frame_segments_name,
								"gps-start-time":int(seg[0]),
								"gps-end-time":int(seg[1]),
								"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
								"tmp-space":inspiral_pipe.condor_scratch_space(),
								"track-psd":"",
								"control-peak-time":options.control_peak_time,
								"coincidence-threshold":options.coincidence_threshold,
								"singles-threshold":options.singles_threshold,
								"fir-stride":options.fir_stride,
								"data-source":"frames",
								"local-frame-caching":"",
								"min-instruments":options.min_instruments,
								"reference-likelihood-file":options.reference_likelihood_file
								},
							input_files = {	"time-slide-file":options.inj_time_slide_file,
									"frame-cache":options.frame_cache,
									"frame-segments-file":options.frame_segments_file,
									"reference-psd":reference_psd,
									"veto-segments-file":options.vetoes,
									"injections": injections
								},
							input_cache_files = {"svd-bank-cache":svd_names},
							input_cache_file_name = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017(filename) for filename in svd_names], '.cache').replace('SVD', 'SVD_%s' % sim_name),
							output_cache_files = {
									"output-cache":output_names
								}
							)
					# Set a post script to check for file integrity
					if options.gzip_test:
						injnode.set_post_script("gzip_test.sh")
						injnode.add_post_script_arg(" ".join(output_names))
					# impose a priority to help with depth first submission
					if bgbin_chunk_map:
						injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1)
					else:
						injnode.set_priority(chunk_counter+1)
					inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)

	# Replace mchirplo:mchirphi:inj.xml with inj.xml
	options.injections = [inj.split(':')[-1] for inj in options.injections]
	return inspiral_nodes

def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):

	# first get the previous output in a usable form
	lloid_output = {}
	for inj in options.injections + [None]:
		lloid_output[sim_tag_from_inj_file(inj)] = {}
	lloid_diststats = {}
	if options.dist_stats_cache:
		for ce in map(CacheEntry, open(options.dist_stats_cache)):
			lloid_diststats[ce.description.split("_")[0]] = [ce.path]
	for ifos in segsdict:
		for seg in segsdict[ifos]:
			# iterate over the mass space chunks for each segment
			for node in inspiral_nodes[(ifos, None)][seg]:
				if node is None:
					break
				len_out_files = len(node.output_files["output-cache"])
				for f in node.output_files["output-cache"]:
					# Store the output files and the node for use as a parent dependency
					lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node]))
				for f in node.output_files["ranking-stat-output-cache"]:
					lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f)
			for inj in options.injections:
				for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
					if injnode is None:
						continue
					for f in injnode.output_files["output-cache"]:
						# Store the output files and the node and injnode for use as a parent dependencies
						bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0]
						try:
							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode]))
						except KeyError:
							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode]))

	return lloid_output, lloid_diststats

def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWithZerolagJob, calcLikelihoodJob, calcLikelihoodJobInj, lalappsRunSqliteJob, toSqliteJob, marginalizeJob, svd_nodes, inspiral_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, mass_model_add_node, mass_model_file):

	likelihood_nodes = {}
	rankpdf_nodes = []
	rankpdf_zerolag_nodes = []
	outnodes = {}
	instruments = "".join(sorted(instrument_set))
	margnodes = {}

	# NOTE! we rely on there being identical templates in each instrument, so we just take one of the values of the svd_nodes which are a dictionary
	one_ifo_svd_nodes = svd_nodes.values()[0]
	# Here n counts the bins
	# first non-injections, which will get skipped if this is an injections-only run
	for n, (outputs, diststats) in enumerate((lloid_output[None][key], lloid_diststats[key]) for key in sorted(lloid_output[None].keys())):
		inputs = [o[0] for o in outputs]
		parents = []
		[parents.extend(o[1]) for o in outputs]
		# FIXME we keep this here in case we someday want to have a
		# mass bin dependent prior, but it really doesn't matter for
		# the time being.
		priornode = inspiral_pipe.generic_node(createPriorDistStatsJob, dag,
				parent_nodes = [one_ifo_svd_nodes[n]] + mass_model_add_node,
				opts = {"instrument":instrument_set, "background-prior":1, "min-instruments":options.min_instruments},
				input_files = {"svd-file":one_ifo_svd_nodes[n].output_files["write-svd"], "mass-model-file":mass_model_file},
				output_files = {"write-likelihood":inspiral_pipe.T050017_filename(instruments, '%04d_CREATE_PRIOR_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = createPriorDistStatsJob.output_path)}
			)
		# Create a file that has the priors *and* all of the diststats
		# for a given bin marginalized over time. This is all that will
		# be needed to compute the likelihood
		diststats_per_bin_node = inspiral_pipe.generic_node(marginalizeJob, dag,
			parent_nodes = [priornode] + parents,
			opts = {"marginalize":"ranking-stat"},
			input_cache_files = {"likelihood-cache":diststats + [priornode.output_files["write-likelihood"]]},
			output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = marginalizeJob.output_path)},
			input_cache_file_name = inspiral_pipe.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.cache')
			)

		calcranknode = inspiral_pipe.generic_node(calcRankPDFsJob, dag,
				parent_nodes = [diststats_per_bin_node],
				opts = {"ranking-stat-samples":options.ranking_stat_samples},
				input_files = {"":diststats_per_bin_node.output_files["output"]},
				output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_CALC_RANK_PDFS' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsJob.output_path)}
			)

		calcrankzerolagnode = inspiral_pipe.generic_node(calcRankPDFsWithZerolagJob, dag,
				parent_nodes = [diststats_per_bin_node],
				opts = {"add-zerolag-to-background":"","ranking-stat-samples":options.ranking_stat_samples},
				input_files = {"":diststats_per_bin_node.output_files["output"]},
				output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_CALC_RANK_PDFS_WZL' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsWithZerolagJob.output_path)}
			)

		margnodes['%04d' %(n,)] = diststats_per_bin_node
		rankpdf_nodes.append(calcranknode)
		rankpdf_zerolag_nodes.append(calcrankzerolagnode)
		
		# Break up the likelihood jobs into chunks to process fewer files, e.g, 16
		likelihood_nodes.setdefault(None,[]).append(
			[inspiral_pipe.generic_node(calcLikelihoodJob, dag,
				parent_nodes = [diststats_per_bin_node],
				opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
				input_files = {"likelihood-url":diststats_per_bin_node.output_files["output"]},
				input_cache_files = {"input-cache":chunked_inputs}
				) for chunked_inputs in chunks(inputs, 16)]
			)

	# then injections
	for inj in options.injections:
		for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[sim_tag_from_inj_file(inj)][key], lloid_diststats[key], key) for key in sorted(lloid_output[sim_tag_from_inj_file(inj)].keys())):
			if outputs is None:
				continue
			inputs = [o[0] for o in outputs]
			parents = []
			[parents.extend(o[1]) for o in outputs]
			if margnodes:
				parents.append(margnodes[bgbin_index])
				likelihood_url = margnodes[bgbin_index].output_files["output"]
			else:
				likelihood_url = diststats[0]
			# Break up the likelihood jobs into chunks to process fewer files, e.g., 16
			likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append(
				[inspiral_pipe.generic_node(calcLikelihoodJobInj, dag,
					parent_nodes = parents,
					opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
					input_files = {"likelihood-url":likelihood_url},
					input_cache_files = {"input-cache":chunked_inputs}
					) for chunked_inputs in chunks(inputs, 16)]
				)

	
	# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
	files_to_group = 40
	for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()):
		# Flatten the nodes for this sub bank
		nodes = flatten(nodes)
		merge_nodes = []
		# Flatten the input/output files from calc_likelihood
		inputs = flatten([node.input_files["input-cache"] for node in nodes])
		if inj is None:
			# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
			for n in range(0, len(inputs), files_to_group):
				merge_nodes.append(inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes,
					opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
					input_files = {"":inputs[n:n+files_to_group]}
					)
				)
				if options.copy_raw_results:
					merge_nodes[-1].set_pre_script("store_raw.sh")
					merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))

			# Merging all the dbs from the same sub bank
			for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
				db = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite') 
				db = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(db).description[:4]]), db)
				sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = merge_nodes,
					opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
					input_cache_files = {"input-cache":inputs},
					output_files = {"database":db},
					input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
				)
				sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
					input_files = {"":db}
				)
				outnodes.setdefault(None, []).append(sqlitenode)
		else:
			# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
			for n in range(0, len(inputs), files_to_group):
				merge_nodes.append(inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes,
					opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
					input_files = {"":inputs[n:n+files_to_group]}
					)
				)
				if options.copy_raw_results:
					merge_nodes[-1].set_pre_script("store_raw.sh")
					merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))

			# Merging all the dbs from the same sub bank and injection run
			for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
				injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite')
				injdb = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(injdb).description[:4]]), injdb)
				sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = merge_nodes,
					opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
					input_cache_files = {"input-cache":inputs},
					output_files = {"database":injdb},
					input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
				)
				sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
					input_files = {"":injdb}
				)
				outnodes.setdefault(sim_tag_from_inj_file(inj), []).append(sqlitenode)

	# make sure outnodes has a None key, even if its value is an empty list
	outnodes.setdefault(None, [])

	return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes

def merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options):
	rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
	rankpdf_zerolag_nodes = []
	outnodes = {}
	if options.num_files_per_background_bin == 1:
		bgbin_lloid_map = {}
		# Get list of all files for each background bin (will be same length for each bin)
		for ce in map(CacheEntry, open(options.lloid_cache)):
			bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)

		if len(bgbin_lloid_map.values()[0]) == 1:
			# Starting with 1:1 mapping between files and bins,
			# thus no merging is needed yet
			outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))]
			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
				outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)]

		else:
			for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
				noninjdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
				# merge all of the dbs from the same subbank
				sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
					opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
					input_cache_files = {"input-cache":dbs},
					output_files = {"database":noninjdb},
					input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
				)

				sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
					input_files = {"":noninjdb}
				)

				outnodes.setdefault(None, []).append(sqlitenode)
			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
				bgbin_lloid_map = {}
				for ce in map(CacheEntry, open(inj_lloid_cache)):
					bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)

				for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
					injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
					# merge all of the dbs from the same subbank
					sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
						opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
						input_cache_files = {"input-cache":dbs},
						output_files = {"database":injdb},
						input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
					)

					sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
						opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
						input_files = {"":injdb}
					)

					outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)

	else:
		# Starting with output of analysis before naming convention
		# update (commit 5efd78fee6b371c999f510d07be33ec64f385695),
		# so all lloid files contain gps times for entire run, and are
		# numbered by iterating through segments in a given bin first
		# (e.g. files 0000 to 0009 may all belong to bin 0000, then
		# files 0010 to 0019 would all belong to bin 0001, etc)
		for ce_list in chunks(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin):
			hi_index = ce_list[-1].description.split('_')[0]
			noninjdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))

			# merge all of the dbs from the same subbank
			sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
				opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
				input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
				output_files = {"database":noninjdb},
				input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
			)

			sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
				opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
				input_files = {"":noninjdb}
			)

			outnodes.setdefault(None, []).append(sqlitenode)

		for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
			for ce_list in chunks(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin):
				hi_index = ce_list[-1].description.split('_')[0]
				injdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))

				# merge all of the dbs from the same subbank
				sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
					opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
					input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
					output_files = {"database":injdb},
					input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
				)

				sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
					input_files = {"":injdb}
				)

				outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)

	return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes

def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSqliteJob, toSqliteNoCacheJob, cpJob, innodes, ligolw_add_nodes, options, instruments):

	num_chunks = 50

	if options.vetoes is None:
		vetoes = []
	else:
		vetoes = [options.vetoes]

	chunk_nodes = []
	dbs_to_delete = []
	# Process the chirp mass bins in chunks to paralellize the merging process
	for chunk, nodes in enumerate(chunks(innodes[None], num_chunks)):
		try:
			dbs = [node.input_files[""] for node in nodes]
			parents = nodes

		except AttributeError:
			# analysis started at merger step but seeded by lloid files which
			# have already been merged into one file per background
			# bin, thus the analysis will begin at this point
			dbs = nodes
			parents = []

		# Merge the final non injection database into chunks
		noninjdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
		sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = parents,
			opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
			input_cache_files = {"input-cache": dbs},
			output_files = {"database":noninjdb},
			input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
		)

		# cluster the final non injection database
		noninjsqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
			opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
			input_files = {"":noninjdb}
		)
		chunk_nodes.append(noninjsqlitenode)
		dbs_to_delete.append(noninjdb)

	# Merge the final non injection database
	outnodes = []
	injdbs = []
	if options.non_injection_db:
		# injection-only run
		noninjdb = options.non_injection_db

	else:
		noninjdb = inspiral_pipe.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
		sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = chunk_nodes,
			opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
			input_files = {"": (vetoes + [options.frame_segments_file])},
			input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]},
			output_files = {"database":noninjdb},
			input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
		)

		# cluster the final non injection database
		noninjsqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
			opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
			input_files = {"":noninjdb}
		)

		cpnode = inspiral_pipe.generic_node(cpJob, dag, parent_nodes = [noninjsqlitenode],
			input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
		)

		outnodes.append(cpnode)

	# FIXME far-injections currently doesnt work, either fix it or delete it
	#for injections, far_injections in zip(options.injections, options.far_injections):
	if options.injections:
		iterable_injections = options.injections
	else:
		iterable_injections = options.injections_for_merger

	for injections in iterable_injections:
		# extract only the nodes that were used for injections
		chunk_nodes = []

		for chunk, injnodes in enumerate(chunks(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
			try:
				dbs = [injnode.input_files[""] for injnode in injnodes]
				parents = injnodes
			except AttributeError:
				dbs = injnodes
				parents = []

			# Setup the final output names, etc.
			injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)


			# merge
			sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = parents,
				opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
				input_cache_files = {"input-cache":dbs},
				output_files = {"database":injdb},
				input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
			)

			# cluster
			clusternode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
				opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
				input_files = {"":injdb}
			)

			chunk_nodes.append(clusternode)
			dbs_to_delete.append(injdb)


		# Setup the final output names, etc.
		injdb = inspiral_pipe.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
		injdbs.append(injdb)
		injxml = injdb.replace('.sqlite','.xml.gz')

		# FIXME far-injections currently doesnt work, either fix it or delete it
		'''
		# If there are injections that are too far away to be seen in a separate file, add them now. 
		if far_injections is not None:
			xml_input = [injxml] + [far_injections]
		else:
			xml_input = injxml
		'''
		xml_input = injxml

		# merge
		sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = chunk_nodes + ligolw_add_nodes,
			opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
			input_files = {"": (vetoes + [options.frame_segments_file, injections])},
			input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]},
			output_files = {"database":injdb},
			input_cache_file_name = injdb.replace('.sqlite','.cache')
		)

		# cluster
		clusternode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
			opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
			input_files = {"":injdb}
		)


		clusternode = inspiral_pipe.generic_node(toXMLJob, dag, parent_nodes = [clusternode],
			opts = {"tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
			output_files = {"extract":injxml},
			input_files = {"database":injdb}
		)

		inspinjnode = inspiral_pipe.generic_node(ligolwInspinjFindJob, dag, parent_nodes = [clusternode],
			opts = {"time-window":0.9},
			input_files = {"":injxml}
		)

		sqlitenode = inspiral_pipe.generic_node(toSqliteNoCacheJob, dag, parent_nodes = [inspinjnode],
			opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
			output_files = {"database":injdb},
			input_files = {"":xml_input}
		)

		cpnode = inspiral_pipe.generic_node(cpJob, dag, parent_nodes = [sqlitenode],
			input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
		)
			
		outnodes.append(cpnode)

	return injdbs, noninjdb, outnodes, dbs_to_delete

def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, rankpdf_nodes, rankpdf_zerolag_nodes, injdbs, noninjdb, final_sqlite_nodes):
	# compute FAPs and FARs
	# split up the marginilization into groups of 10
	try:
		margin = [node.output_files["output"] for node in rankpdf_nodes]
		parents = rankpdf_nodes
		margin_zerolag =  [node.output_files["output"] for node in rankpdf_zerolag_nodes]
		parents_zerolag = rankpdf_zerolag_nodes
	except AttributeError:
		# analysis started at merger step
		margin = rankpdf_nodes
		parents = []
		margin_zerolag = rankpdf_zerolag_nodes
		parents_zerolag = []
	margout = []
	margzerolagout = []
	margnodes = []
	margzerolagnodes = []
	margnum = 16
	for i,n in enumerate(range(0, len(margin), margnum)):
		margout.append(inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin[n:n+margnum]], '.xml.gz', path = marginalizeJob.output_path))
		margnodes.append(inspiral_pipe.generic_node(marginalizeJob, dag, parent_nodes = parents,
			opts = {"marginalize":"ranking-stat-pdf"},
			output_files = {"output":margout[-1]}, 
			input_cache_files = {"likelihood-cache":margin[n:n+margnum]},
			input_cache_file_name = os.path.basename(margout[-1]).replace('.xml.gz','.cache')
		))

		if rankpdf_zerolag_nodes:
			margzerolagout.append(inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin_zerolag[n:n+margnum]], '.xml.gz', path = marginalizeWithZerolagJob.output_path))
			margzerolagnodes.append(inspiral_pipe.generic_node(marginalizeWithZerolagJob, dag, parent_nodes = parents_zerolag,
				opts = {"marginalize":"ranking-stat-pdf"},
				output_files = {"output":margzerolagout[-1]},
				input_cache_files = {"likelihood-cache":margin_zerolag[n:n+margnum]},
				input_cache_file_name = os.path.basename(margzerolagout[-1]).replace('.xml.gz','.cache')
			))


	if options.marginalized_likelihood_file:
		# injection-only run
		parents = final_sqlite_nodes
		parents_zerolag = final_sqlite_nodes
		marginalized_likelihood_file = options.marginalized_likelihood_file
		marginalized_likelihood_with_zerolag_file = options.marginalized_likelihood_with_zerolag_file

	else:
		margnode = inspiral_pipe.generic_node(marginalizeJob, dag, parent_nodes = margnodes,
			opts = {"marginalize":"ranking-stat-pdf"},
			output_files = {"output":"marginalized_likelihood.xml.gz"},
			input_cache_files = {"likelihood-cache":margout},
			input_cache_file_name = "marginalized_likelihood.cache"
		)
		parents = [margnode] + final_sqlite_nodes
		marginalized_likelihood_file = margnode.output_files["output"]

		margnode = inspiral_pipe.generic_node(marginalizeWithZerolagJob, dag, parent_nodes = margzerolagnodes,
			opts = {"marginalize":"ranking-stat-pdf"},
			output_files = {"output":"marginalized_likelihood_with_zerolag.xml.gz"},
			input_cache_files = {"likelihood-cache":margzerolagout},
			input_cache_file_name = "marginalized_likelihood_with_zerolag.cache"
		)
		parents_zerolag = [margnode] + final_sqlite_nodes
		marginalized_likelihood_with_zerolag_file = margnode.output_files["output"]
	
	
	farnode = inspiral_pipe.generic_node(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents,
		opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
		input_files = {"background-bins-file":marginalized_likelihood_file, "injection-db":injdbs, "non-injection-db":noninjdb}
	)
	
	inspiral_pipe.generic_node(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents_zerolag,
		opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
		input_files = {"background-bins-file":marginalized_likelihood_with_zerolag_file, "injection-db":[injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs], "non-injection-db":noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL')}
	)

	return farnode, margout + margzerolagout

def parse_command_line():
	parser = OptionParser(description = __doc__)

	# generic data source options
	datasource.append_options(parser)
	parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s.  Note that 50% will be used for zero-padding.")

	# reference_psd
	parser.add_option("--reference-psd", help = "Don't measure PSDs, use this one instead")

	# Template bank
	parser.add_option("--template-bank", metavar = "filename", help = "Set the template bank xml file.")
	parser.add_option("--mass-model", metavar = "filename", help = "Set the name of the mass model. Options are 'file', 'salpeter'")
	parser.add_option("--mass-model-file", metavar = "filename", help = "Set the name of the mass model file, e.g., mass_model.h5.  Required if --mass-model=file")

	# SVD bank construction options
	parser.add_option("--overlap", metavar = "num", type = "int", action = "append", help = "set the factor that describes the overlap of the sub banks, must be even!")
	parser.add_option("--autocorrelation-length", type = "int", default = 201, help = "The minimum number of samples to use for auto-chisquared, default 201 should be odd")
	parser.add_option("--samples-min", type = "int", action = "append", help = "The minimum number of samples to use for time slices default 1024 (can be given multiple times, one for each time --bank-cache is invoked)")
	parser.add_option("--samples-max-256", type = "int", default = 1024, help = "The maximum number of samples to use for time slices with frequencies above 256Hz, default 1024")
	parser.add_option("--samples-max-64", type = "int", default = 2048, help = "The maximum number of samples to use for time slices with frequencies above 64Hz, default 2048")
	parser.add_option("--samples-max", type = "int", default = 4096, help = "The maximum number of samples to use for time slices with frequencies below 64Hz, default 4096")
	parser.add_option("--bank-cache", metavar = "filenames", action = "append", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc.. (can be given multiple times)")
	parser.add_option("--tolerance", metavar = "float", type = "float", default = 0.9999, help = "set the SVD tolerance, default 0.9999")
	parser.add_option("--flow", metavar = "num", type = "float", default = 40, help = "set the low frequency cutoff, default 40 (Hz)")
	parser.add_option("--identity-transform", action = "store_true", help = "Use identity transform, i.e. no SVD")
	
	# trigger generation options
	parser.add_option("--vetoes", metavar = "filename", help = "Set the veto xml file.")
	parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
	parser.add_option("--inj-time-slide-file", metavar = "filename", help = "Set the time slide table xml file for injections")
	parser.add_option("--web-dir", metavar = "directory", help = "Set the web directory like /home/USER/public_html")
	parser.add_option("--fir-stride", type="int", metavar = "secs", default = 8, help = "Set the duration of the fft output blocks, default 8")
	parser.add_option("--control-peak-time", type="int", default = 8, metavar = "secs", help = "Set the peak finding time for the control signal, default 8")
	parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005).  The light-travel time between instruments will be added automatically in the coincidence test.")
	parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
	parser.add_option("--reference-likelihood-file", metavar = "file", help = "Set a reference likelihood file to compute initial likelihood ratios. Required")
	parser.add_option("--num-banks", metavar = "str", help = "The number of parallel subbanks per gstlal_inspiral job. can be given as a list like 1,2,3,4 then it will split up the bank cache into N groups with M banks each.")
	parser.add_option("--max-inspiral-jobs", type="int", metavar = "jobs", help = "Set the maximum number of gstlal_inspiral jobs to run simultaneously, default no constraint.")
	parser.add_option("--ht-gate-threshold", type="float", help="set a threshold on whitened h(t) to veto glitches")
	parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
	parser.add_option("--inspiral-executable", default = "gstlal_inspiral", help = "Options gstlal_inspiral | gstlal_iir_inspiral, default gstlal_inspiral")
	parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral table or otherwise processing the data differently.  Has the effect of having hidden signals in the input data. Separate injection runs using the --injections option will still occur.")
	# FIXME far-injections currently doesnt work, either fix it or delete it
	#parser.add_option("--far-injections", action = "append", help = "Injection files with injections too far away to be seen and are not filtered. Required. See https://www.lsc-group.phys.uwm.edu/ligovirgo/cbcnote/NSBH/MdcInjections/MDC1 for example.")
	parser.add_option("--singles-threshold", default=float("inf"), action = "store", metavar="THRESH", help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
	parser.add_option("--copy-raw-results", default=False, action = "store_true", help = "Copy raw gstlal_inspiral results before applying clustering and other lossy operations.")
	parser.add_option("--gzip-test", default=False, action = "store_true", help = "Perform gzip --test on all output files.")
	parser.add_option("--verbose", action = "store_true", help = "Be verbose")
	parser.add_option("--disable-calc-inj-snr", default=False, action = "store_true", help = "Disable injection SNR calculation")
	parser.add_option("--ranking-stat-samples", metavar = "N", default = 2**24, type = "int", help = "Construct ranking statistic histograms by drawing this many samples from the ranking statistic generator (default = 2^24).")

	# Override the datasource injection option
	parser.remove_option("--injections")
	parser.add_option("--injections", action = "append", help = "append injection files to analyze. Must prepend filename with X:Y:, where X and Y are floats, e.g. 1.2:3.1:filename, so that the injections are only searched for in regions of the template bank with X <= chirp mass < Y.")

	# Data from a zero lag run in the case of an injection-only run.
	parser.add_option("--dist-stats-cache", metavar = "filename", help = "Set the cache file for dist stats (required iff running injection-only analysis)")
	parser.add_option("--svd-bank-cache", metavar = "filename", help = "Set the cache file for svd banks (required iff running injection-only analysis)")
	parser.add_option("--psd-cache", metavar = "filename", help = "Set the cache file for psd (required iff running injection-only analysis)")
	parser.add_option("--non-injection-db", metavar = "filename", help = "Set the non injection data base file (required iff running injection-only analysis)")
	parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)")
	parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)")

	# Data from a previous run in the case of a run that starts at the merger step
	parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
	parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
	parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
	parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions")
	parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")

	# Condor commands
	parser.add_option("--request-cpu", default = "4", metavar = "integer", help = "set the inspiral CPU count, default = 4")
	parser.add_option("--request-memory", default = "7GB", metavar = "integer", help = "set the inspiral memory, default = 7GB")
	parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")

	options, filenames = parser.parse_args()

	if options.mass_model not in ("salpeter", "file"):
		raise ValueError("--mass-model must be 'salpeter' or 'file'")
	if options.mass_model == "file" and not options.mass_model_file:
		raise ValueError("--mass-model-file must be provided if --mass-model=file")

	if options.num_banks:
		options.num_banks = [int(v) for v in options.num_banks.split(",")]

	if not options.samples_min and not options.svd_bank_cache:
		options.samples_min = [1024]*len(options.bank_cache)

	if not options.overlap and not options.svd_bank_cache:
		options.overlap = [0]*len(options.bank_cache)
	
	if not options.svd_bank_cache and any(overlap % 2 for overlap in options.overlap):
		raise ValueError("overlap must be even")

	if not options.svd_bank_cache and not (len(options.samples_min) == len(options.bank_cache) == len(options.overlap)):
		raise ValueError("must provide same number of inputs for --samples-min, --bank-cache, --overlap")

	missing_injection_options = []
	for option in ("dist_stats_cache", "svd_bank_cache", "psd_cache", "non_injection_db", "marginalized_likelihood_file", "marginalized_likelihood_with_zerolag_file"):
		if getattr(options, option) is None:
			missing_injection_options.append(option)
	if len(missing_injection_options) > 0 and len(missing_injection_options) < 6:
		raise ValueError("missing injection-only options %s." % ", ".join([option for option in missing_injection_options]))
	if len(missing_injection_options) == 0 and options.num_banks:
		raise ValueError("cant specify --num-banks in injection-only run")

	missing_merger_options = []
	for option in ("lloid_cache", "inj_lloid_cache", "rank_pdf_cache"):
		if getattr(options, option) is None:
			missing_merger_options.append(option)
	if len(missing_injection_options) > 0 and len(missing_injection_options) < 3:
		raise ValueError("missing merger-run options %s." % ", ".join([option for option in missing_injection_options]))
	if len(missing_injection_options) == 0 and options.num_banks:
		raise ValueError("cant specify --num-banks in a run starting at the merger step")


	fail = ""
	required_options = []
	if len(missing_merger_options) == 3:
		required_options.append("reference_likelihood_file")
		if len(missing_injection_options) == 6:
			required_options.append("bank_cache")

	for option in required_options:
		if getattr(options, option) is None:
			fail += "must provide option %s\n" % (option)
	if fail: raise ValueError, fail

	# FIXME far-injections currently doesnt work, either fix it or delete it
	'''
	if options.far_injections is not None and len(options.injections) != len(options.far_injections):
		raise ValueError("number of injection files and far injection files must be equal")
	if options.far_injections is None:
		options.far_injections = [None for inj in options.injections]
	'''


	#FIXME a hack to find the sql paths
	share_path = os.path.split(inspiral_pipe.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal')
	options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql')
	options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql')

	return options, filenames


#
# Useful variables
#

options, filenames = parse_command_line()

if options.bank_cache:
	# FIXME Add feature for multiple bank caches to online pipeline so that
	# this functionality can return to inspiral_pipe
	bank_cache = {}
	for bank_cache_str in options.bank_cache:
		for c in bank_cache_str.split(','):
			ifo = c.split("=")[0]
			cache = c.replace(ifo+"=","")
			bank_cache.setdefault(ifo, []).append(cache)
	detectors = datasource.GWDataSourceInfo(options)
	channel_dict = detectors.channel_dict
	if bank_cache:
		instruments = "".join(sorted(bank_cache.keys()))
		instrument_set = bank_cache.keys()
	boundary_seg = detectors.seg

elif options.svd_bank_cache:
	detectors = datasource.GWDataSourceInfo(options)
	channel_dict = detectors.channel_dict
	boundary_seg = detectors.seg

else:
	with open(options.rank_pdf_cache) as f:
		ce = CacheEntry(f.readline())
	boundary_seg = ce.segment
	instruments = ce.observatory

output_dir = "plots"

#
# Setup the dag
#

try:
	os.mkdir("logs")
except:
	pass
dag = inspiral_pipe.DAG("trigger_pipe")

if options.max_inspiral_jobs is not None:
	dag.add_maxjobs_category("INSPIRAL", options.max_inspiral_jobs)

#
# Make an xml integrity checker
#

if options.gzip_test:
	f = open("gzip_test.sh", "w")
	f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
	f.close()
	os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)

#
# A pre script to backup data before feeding to lossy programs
# (e.g. clustering routines)
#

f = open("store_raw.sh", "w")
f.write("""#!/bin/bash
for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
f.close()
os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)


#
# setup the job classes
#

if options.dist_stats_cache:
	# injection-only run
	gstlalInspiralJob = None
	createPriorDistStatsJob = None
	calcRankPDFsJob = None
	calcRankPDFsWithZerolagJob = None
	calcLikelihoodJob = None
	marginalizeJob = None
	marginalizeWithZerolagJob = None

elif options.lloid_cache:
	# analysis starting at merger step
	marginalizeJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	marginalizeWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))

else:
	# set up jobs only needed for zerolag run
	refPSDJob = inspiral_pipe.generic_job("gstlal_reference_psd", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"}))
	medianPSDJob = inspiral_pipe.generic_job("gstlal_median_of_psds", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	plotBanksJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_banks", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	svdJob = inspiral_pipe.generic_job("gstlal_svd_bank", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	modelJob = inspiral_pipe.generic_job("gstlal_inspiral_mass_model", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	modelAddJob = inspiral_pipe.generic_job("gstlal_inspiral_add_mass_models", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	horizonJob = inspiral_pipe.generic_job("gstlal_plot_psd_horizon", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	gstlalInspiralJob = inspiral_pipe.generic_job(options.inspiral_executable, condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
	createPriorDistStatsJob = inspiral_pipe.generic_job("gstlal_inspiral_create_prior_diststats", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	calcRankPDFsJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_rank_pdfs", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"}))
	calcRankPDFsWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_rank_pdfs", tag_base = "gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"}))
	calcLikelihoodJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	marginalizeJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
	marginalizeWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))

gstlalInspiralInjJob = inspiral_pipe.generic_job(options.inspiral_executable, tag_base="gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
injSplitterJob = inspiral_pipe.generic_job("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
gstlalInjSnrJob = inspiral_pipe.generic_job("gstlal_inspiral_injection_snr", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"}))
ligolwAddJob = inspiral_pipe.generic_job("ligolw_add", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
calcLikelihoodJobInj = inspiral_pipe.generic_job("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
gstlalInspiralComputeFarFromSnrChisqHistogramsJob = inspiral_pipe.generic_job("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
ligolwInspinjFindJob = inspiral_pipe.generic_job("lalapps_inspinjfind", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
toSqliteJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
toSqliteNoCacheJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml_inj_final", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
toXMLJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_to_xml", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
lalappsRunSqliteJob = inspiral_pipe.generic_job("lalapps_run_sqlite", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotSummaryJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotSummaryIsolatePrecessionJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotIndividualInjectionsSummaryJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotIndividualInjectionsSummaryIsolatePrecessionJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotSensitivityJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_sensitivity", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
pageJob = inspiral_pipe.generic_job("gstlal_inspiral_summary_page", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotbackgroundJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_background", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
cpJob = inspiral_pipe.generic_job("cp", tag_base = "cp", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
rmJob = inspiral_pipe.generic_job("rm", tag_base = "rm_intermediate_merger_products", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))

#
# Get mchirp boundaries of banks, maximum duration of templates, and analysis segments
#

if options.bank_cache:
	max_time, template_mchirp_dict = get_bank_params(bank_cache, options)
	segsdict = analysis_segments(set(bank_cache.keys()), detectors.frame_segments, boundary_seg, max_time, options.min_instruments)

if options.psd_cache:
	template_mchirp_dict, svd_nodes, max_time = inspiral_pipe.get_svd_bank_params(options.svd_bank_cache)
	segsdict = analysis_segments(set(svd_nodes.keys()), detectors.frame_segments, boundary_seg, max_time, options.min_instruments)
	psd_nodes, ref_psd_parent_nodes = inj_psd_node_gen(segsdict, options)
	instruments = "".join(sorted(svd_nodes.keys()))
	instrument_set = svd_nodes.keys()

elif options.lloid_cache:
	# starting analysis at merger step, nothing to do here
	pass

elif options.reference_psd is None:
	#
	# Compute the PSDs for each segment
	#


	psd_nodes = psd_node_gen(refPSDJob, dag, [], segsdict, channel_dict, options)

	#
	# Make the reference PSD cache
	#

	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
	output_cache_file = open('reference_psd.cache', "w")
	for node in psd_nodes.values():
		output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"])))
	output_cache_file.close()

	#
	# plot the horizon distance
	#

	inspiral_pipe.generic_node(horizonJob, dag,
		parent_nodes = psd_nodes.values(),
		input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]},
		output_files = {"":inspiral_pipe.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)}
	)

	#
	# compute the median PSD
	#

	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
	median_psd_node = \
		inspiral_pipe.generic_node(medianPSDJob, dag,
			parent_nodes = psd_nodes.values(),
			input_files = {"input-cache": "reference_psd.cache"},
			output_files = {"output-name": inspiral_pipe.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))}
		)

	ref_psd = median_psd_node.output_files["output-name"]
	ref_psd_parent_nodes = [median_psd_node]

else:
	ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))

	# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
	output_cache_file = open('reference_psd.cache', "w")
	output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd)))
	output_cache_file.close()

	ref_psd_parent_nodes = []

#
# Calculate Expected SNR jobs
#

num_split_inj_snr_jobs = 100
ligolw_add_nodes = []

if not options.lloid_cache and not options.disable_calc_inj_snr:
	for inj in options.injections:
		inj_snr_nodes = []

		inj_splitter_node = inspiral_pipe.generic_node(injSplitterJob, dag, parent_nodes=[],
			opts = {"output-path":injSplitterJob.output_path, "usertag": sim_tag_from_inj_file(inj.split(":")[-1]), "nsplit": num_split_inj_snr_jobs},
			input_files = {"": inj.split(":")[-1]}
		)
		inj_splitter_node.set_priority(98)

		# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
		for i in xrange(num_split_inj_snr_jobs):
			injSNRnode = inspiral_pipe.generic_node(gstlalInjSnrJob, dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node],
				opts = {"flow":options.flow},
				input_files = {"injection-file": "%s/%s_INJ_SPLIT_%04d.xml" % (injSplitterJob.output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i), "reference-psd-cache": "reference_psd.cache"
				}
			)
			injSNRnode.set_priority(98)
			inj_snr_nodes.append(injSNRnode)

		ligolw_add_nodes.append(inspiral_pipe.generic_node(ligolwAddJob, dag, parent_nodes=inj_snr_nodes,
			input_files = {"": ' '.join(["%s/%s_INJ_SPLIT_%04d.xml" % (injSplitterJob.output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in xrange(num_split_inj_snr_jobs)])},
			opts = {"ilwdchar-compat":""},
			output_files = {"output": inj.split(":")[-1]}
		))

if options.bank_cache:
	#
	# Compute SVD banks
	#
	#svd_nodes, template_mchirp_dict = svd_node_gen(svdJob, dag, ref_psd_parent_nodes, ref_psd, inspiral_pipe.build_bank_groups(bank_cache, options.num_banks), options, boundary_seg, template_mchirp_dict)
	svd_nodes, template_mchirp_dict = svd_node_gen(svdJob, dag, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, template_mchirp_dict)
	model_add_node, model_file_name = model_node_gen(modelJob, dag, instruments, options, boundary_seg, options.template_bank)

if not options.lloid_cache:
	#
	# Inspiral jobs by segment
	#

	inspiral_nodes = inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)

	#
	# Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
	#

	lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)

	#
	# Setup likelihood jobs, clustering and/or merging
	#

	rankpdf_nodes, rankpdf_zerolag_nodes, outnodes = rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWithZerolagJob, calcLikelihoodJob, calcLikelihoodJobInj, lalappsRunSqliteJob, toSqliteJob, marginalizeJob, svd_nodes, inspiral_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_add_node, model_file_name)

else:
	#
	# Merge lloid files into 1 file per bin if not already 1 file per bin
	#

	rankpdf_nodes, rankpdf_zerolag_nodes, outnodes = merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options)

#
# after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof)
#

injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSqliteJob, toSqliteNoCacheJob, cpJob, outnodes, ligolw_add_nodes, options, instruments)

#
# Compute FAP
#

farnode, margfiles_to_delete = compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, rankpdf_nodes, rankpdf_zerolag_nodes, injdbs, noninjdb, final_sqlite_nodes)

# make summary plots
plotnodes = []

plotnodes.append(inspiral_pipe.generic_node(plotSummaryJob, dag, parent_nodes=[farnode],
	opts = {"segments-name": options.frame_segments_name, "tmp-space": inspiral_pipe.condor_scratch_space(), "user-tag": "ALL_LLOID_COMBINED", "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""},
	input_files = {"":[noninjdb] + injdbs}
))

plotnodes.append(inspiral_pipe.generic_node(plotSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode],
	opts = {"segments-name": options.frame_segments_name, "tmp-space": inspiral_pipe.condor_scratch_space(), "user-tag": "PRECESSION_LLOID_COMBINED", "plot-group":1, "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""},
	input_files = {"":[noninjdb] + injdbs}
))

for injdb in injdbs:
	plotnodes.append(inspiral_pipe.generic_node(plotIndividualInjectionsSummaryJob, dag, parent_nodes=[farnode],
		opts = {"segments-name": options.frame_segments_name, "tmp-space":inspiral_pipe.condor_scratch_space(), "user-tag":injdb.replace(".sqlite","").split("-")[1], "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""},
		input_files = {"":[noninjdb] + [injdb]}
	))

	plotnodes.append(inspiral_pipe.generic_node(plotIndividualInjectionsSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode],
		opts = {"segments-name": options.frame_segments_name, "tmp-space":inspiral_pipe.condor_scratch_space(), "user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""},
		input_files = {"":[noninjdb] + [injdb]}
	))

# make sensitivity plots
plotnodes.append(inspiral_pipe.generic_node(plotSensitivityJob, dag, parent_nodes=[farnode],
	opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir, "tmp-space":inspiral_pipe.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"},
	input_files = {"zero-lag-database":noninjdb, "":injdbs}
))
for injdb in injdbs:
	plotnodes.append(inspiral_pipe.generic_node(plotSensitivityJob, dag, parent_nodes=[farnode],
		opts = {"user-tag":injdb.replace(".sqlite","").split("-")[1], "output-dir":output_dir, "tmp-space":inspiral_pipe.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"},
		input_files = {"zero-lag-database":noninjdb, "":injdb}
	))


# make background plots
plotnodes.append(inspiral_pipe.generic_node(plotbackgroundJob, dag, parent_nodes = [farnode], opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir}, input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb}))

# make a web page
inspiral_pipe.generic_node(pageJob, dag, parent_nodes = plotnodes, 
	opts = {"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])), "webserver-dir":options.web_dir, "glob-path":output_dir, "output-user-tag":["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"] + [injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs] + [injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs]}
)

#
# rm intermediate merger products
#
for db in dbs_to_delete:
	inspiral_pipe.generic_node(rmJob, dag, parent_nodes = plotnodes,
		input_files = {"": db}
	)

for margfile in margfiles_to_delete:
	inspiral_pipe.generic_node(rmJob, dag, parent_nodes = plotnodes,
		input_files = {"": margfile}
	)

#
# all done
#

dag.write_sub_files()
dag.write_dag()
dag.write_script()
dag.write_cache()