gstlal_inspiral_pipe 83.9 KB
Newer Older
1 2
#!/usr/bin/env python
#
3
# Copyright (C) 2011-2014 Chad Hanna
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

Chad Hanna's avatar
Chad Hanna committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32

### The offline gstlal inspiral workflow generator; Use to make HTCondor DAGs to run CBC workflows
###
### Usage:
### ------
###
### It is rare that you would invoke this program in a standalone mode. Usually
### the inputs are complicated and best automated via a Makefile, e.g.,
### Makefile.triggers_example
###
### Diagram of the HTCondor workfow produced
### ----------------------------------------
###
### .. graphviz:: ../images/trigger_pipe.dot
33 34
### 

35

36 37 38 39
"""
This program makes a dag to run gstlal_inspiral offline
"""

40
__author__ = 'Chad Hanna <chad.hanna@ligo.org>'
41

42 43 44
##############################################################################
# import standard modules and append the lalapps prefix to the python path
import sys, os, stat
45
import itertools
46
import numpy
47
from optparse import OptionParser
48

49 50 51
##############################################################################
# import the modules we need to build the pipeline
import lal
52
import lal.series
53
from lal.utils import CacheEntry
54
from ligo import segments
55 56 57
from ligo.lw import ligolw
from ligo.lw import lsctables
import ligo.lw.utils as ligolw_utils
58 59
import ligo.lw.utils.segments as ligolw_segments
from gstlal import inspiral, inspiral_pipe
60
from gstlal import dagparts
61
from gstlal import datasource
62
from gstlal import paths as gstlal_config_paths
63

64 65 66 67
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
	pass
lsctables.use_in(LIGOLWContentHandler)

68

69 70 71 72
#
# Utility functions
#

73 74 75 76

def sim_tag_from_inj_file(injections):
	if injections is None:
		return None
77
	return injections.replace('.xml', '').replace('.gz', '').replace('-','_')
78

79
def get_bank_params(bank_cache, options, verbose = False):
80
	max_time = 0
81
	template_mchirp_dict = {}
82
	for n, cache in enumerate(bank_cache.values()[0]):
83 84 85
		for ce in map(CacheEntry, open(cache)):
			for ce in map(CacheEntry, open(ce.path)):
				xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler)
86 87
				snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc)
				max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration')))
88
				template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[options.overlap[n]/2:-options.overlap[n]/2]), max(snglinspiraltable.getColumnByName('mchirp')[options.overlap[n]/2:-options.overlap[n]/2])]
89
				xmldoc.unlink()
90

91
	return max_time, template_mchirp_dict
92

93 94 95 96 97 98 99 100
def subdir_path(dirlist):
	output_path = '/'.join(dirlist)
	try:
		os.mkdir(output_path)
	except:
		pass
	return output_path

101 102 103 104
#
# get a dictionary of all the disjoint 2+ detector combination segments
#

105
def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
106
	segsdict = segments.segmentlistdict()
107
	# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
108
	start_pad = 512 + max_template_length
109
	# Chosen so that the overlap is only a ~5% hit in run time for long segments...
110
	segment_length = int(5 * start_pad)
111
	for n in range(min_instruments, 1 + len(analyzable_instruments_set)):
112
		for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n):
113 114 115 116
			# never analyze H1H2 or H2L1 times
			#if set(ifo_combos) == set(('H1', 'H2')) or set(ifo_combos) == set(('L1', 'H2')):
			#	print >> sys.stderr, "not analyzing: ", ifo_combos, " only time"
			#	continue
117 118
			segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
			segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
119
			segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad)
120
			segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad)
121 122 123 124
			if not segsdict[frozenset(ifo_combos)]:
				del segsdict[frozenset(ifo_combos)]
	return segsdict

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
def psd_node_gen(refPSDJob, dag, parent_nodes, segsdict, channel_dict, options):
	psd_nodes = {}
	for ifos in segsdict:
		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
		for seg in segsdict[ifos]:
			psd_nodes[(ifos, seg)] = \
				dagparts.DAGNode(refPSDJob, dag, parent_nodes = parent_nodes,
					opts = {"gps-start-time":int(seg[0]),
						"gps-end-time":int(seg[1]),
						"data-source":"frames",
						"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
						"psd-fft-length":options.psd_fft_length,
						"frame-segments-name": options.frame_segments_name},
					input_files = {	"frame-cache":options.frame_cache,
							"frame-segments-file":options.frame_segments_file},
					output_files = {"write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = subdir_path([refPSDJob.output_path, str(int(seg[0]))[:5]]))}
				)
	return psd_nodes

def inj_psd_node_gen(segsdict, options):
	psd_nodes = {}
	psd_cache_files = {}
	for ce in map(CacheEntry, open(options.psd_cache)):
		psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path))
	for ifos in segsdict:
		reference_psd_files = sorted(psd_cache_files[ifos], key = lambda (s, p): s)
		ref_psd_file_num = 0
		for seg in segsdict[ifos]:
			while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]):
				ref_psd_file_num += 1
			psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1]
	ref_psd_parent_nodes = []
	return psd_nodes, ref_psd_parent_nodes

def model_node_gen(modelJob, dag, parent_nodes, instruments, options, seg, template_bank, psd):
	if options.mass_model_file is None:
		# choose, arbitrarily, the lowest instrument in alphabetical order
		model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = modelJob.output_path)
		model_node = dagparts.DAGNode(modelJob, dag,
			input_files = {"template-bank": template_bank, "reference-psd": psd},
			opts = {"model":options.mass_model},
			output_files = {"output": model_file_name},
			parent_nodes = parent_nodes
		)
		return [model_node], model_file_name
	else:
		return [], options.mass_model_file

173
svd_to_dtdphi_map = {}
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
def svd_node_gen(svdJob, dag, parent_nodes, psd, bank_cache, options, seg, template_mchirp_dict):
	svd_nodes = {}
	new_template_mchirp_dict = {}
	for ifo, list_of_svd_caches in bank_cache.items():
		bin_offset = 0
		for j, svd_caches in enumerate(list_of_svd_caches):
			svd_caches = map(CacheEntry, open(svd_caches))
			for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches):
				# First sort out the clipleft, clipright options
				clipleft = []
				clipright = []
				ids = []
				mchirp_interval = (float("inf"), 0)
				individual_svd_cache = map(CacheEntry, open(individual_svd_cache))
				for n, f in enumerate(ce.path for ce in individual_svd_cache):
					# handle template bank clipping
					clipleft.append(options.overlap[j] / 2)
					clipright.append(options.overlap[j] / 2)
					ids.append("%d_%d" % (i+bin_offset, n))
					if f in template_mchirp_dict:
						mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))
195
				svd_to_dtdphi_map[i+bin_offset] = options.dtdphi_file[j]
196 197 198 199 200 201 202
				svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = svdJob.output_path)
				if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
					new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval

				svdnode = dagparts.DAGNode(svdJob, dag,
					parent_nodes = parent_nodes,
					opts = {"svd-tolerance":options.tolerance,
203
						"flow":options.flow[j],
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
						"sample-rate":options.sample_rate,
						"clipleft":clipleft,
						"clipright":clipright,
						"samples-min":options.samples_min[j],
						"samples-max-256":options.samples_max_256,
						"samples-max-64":options.samples_max_64,
						"samples-max":options.samples_max,
						"autocorrelation-length":options.autocorrelation_length,
						"bank-id":ids,
						"identity-transform":options.identity_transform,
						"ortho-gate-fap":0.5},
					input_files = {"reference-psd":psd},
					input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]},
					input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"),
					output_files = {"write-svd":svd_bank_name}
					)
				# impose a priority to help with depth first submission
				svdnode.set_priority(99)
				svd_nodes.setdefault(ifo, []).append(svdnode)
			bin_offset += i+1
	#
	# Plot template/svd bank jobs
	#

	primary_ifo = bank_cache.keys()[0]
	dagparts.DAGNode(plotBanksJob, dag,
				parent_nodes = sum(svd_nodes.values(),[]),
				opts = {"plot-template-bank":"",
					"output-dir": output_dir},
				input_files = {"template-bank-file":options.template_bank}
				)
	return svd_nodes, new_template_mchirp_dict

237
def create_svd_bank_strings(svd_nodes, instruments = None):
238 239 240 241 242
	# FIXME assume that the number of svd nodes is the same per ifo, a good assumption though
	outstrings = []
	for i in range(len(svd_nodes.values()[0])):
		svd_bank_string = ""
		for ifo in svd_nodes:
243 244
			if instruments is not None and ifo not in instruments:
				continue
245 246 247 248
			try:
				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"])
			except AttributeError:
				svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i])
249 250 251 252
		svd_bank_string = svd_bank_string.strip(",")
		outstrings.append(svd_bank_string)
	return outstrings

253
def svd_bank_cache_maker(svd_bank_strings, injection = False):
254 255 256 257 258 259 260
	if injection:
		dir_name = "gstlal_inspiral_inj"
	else:
		dir_name = "gstlal_inspiral"
	svd_cache_entries = []
	parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
	for svd_bank_parsed_dict in parsed_svd_bank_strings:
261
		for filename in svd_bank_parsed_dict.itervalues():
262
			svd_cache_entries.append(CacheEntry.from_T050017(filename))
263

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
	return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries] 

def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):

	inspiral_nodes = {}
	for ifos in segsdict:

		# setup dictionaries to hold the inspiral nodes
		inspiral_nodes[(ifos, None)] = {}	
		ignore = {}
		injection_files = []
		for injections in options.injections:
			min_chirp_mass, max_chirp_mass, injections = injections.split(':')
			injection_files.append(injections)
			min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass)
			inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
			ignore[injections] = []
			for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)):
				if max_chirp_mass <= bounds[0]:
					ignore[injections].append(int(bgbin_index))
					# NOTE putting a break here assumes that the min chirp mass
					# in a subbank increases with bin number, i.e. XXXX+1 has a
					# greater minimum chirpmass than XXXX, for all XXXX. Note
					# that the reverse is not true, bin XXXX+1 may have a lower
					# max chirpmass than bin XXXX.
				elif min_chirp_mass > bounds[1]:
					ignore[injections].append(int(bgbin_index))

		# FIXME choose better splitting?
293
		numchunks = 50
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313

		# only use a channel dict with the relevant channels
		this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)

		# get the svd bank strings
		svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys())

		# get a mapping between chunk counter and bgbin for setting priorities
		bgbin_chunk_map = {}

		for seg in segsdict[ifos]:
			if injection_files:
				output_seg_inj_path = subdir_path([gstlalInspiralInjJob.output_path, str(int(seg[0]))[:5]])
			
			if gstlalInspiralJob is None:
				# injection-only run
				inspiral_nodes[(ifos, None)].setdefault(seg, [None])

			else:
				output_seg_path = subdir_path([gstlalInspiralJob.output_path, str(int(seg[0]))[:5]])
314
				for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)):
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
					bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
					# setup output names
					output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)]
					output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i, s in enumerate(svd_bank_strings)]
					dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i,s in enumerate(svd_bank_strings)]

					for bgbin in bgbin_indices:
						bgbin_chunk_map.setdefault(bgbin, chunk_counter)

					# Calculate the appropriate ht-gate-threshold values according to the scale given
					threshold_values = None
					if options.ht_gate_threshold_linear is not None:
						# A scale is given
						mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
						# use max mchirp in a given svd bank to decide gate threshold
						bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
						threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
					else:
						if options.ht_gate_threshold is not None:
							threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given

					# non injection node
					noninjnode = dagparts.DAGNode(gstlalInspiralJob, dag,
							parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
							opts = {"psd-fft-length":options.psd_fft_length,
								"ht-gate-threshold":threshold_values,
								"frame-segments-name":options.frame_segments_name,
								"gps-start-time":int(seg[0]),
								"gps-end-time":int(seg[1]),
								"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
								"tmp-space":dagparts.condor_scratch_space(),
								"track-psd":"",
								"control-peak-time":options.control_peak_time,
								"coincidence-threshold":options.coincidence_threshold,
								"singles-threshold":options.singles_threshold,
								"fir-stride":options.fir_stride,
								"data-source":"frames",
								"local-frame-caching":"",
								"min-instruments":options.min_instruments,
								"reference-likelihood-file":options.reference_likelihood_file
								},
							input_files = {	"time-slide-file":options.time_slide_file,
									"frame-cache":options.frame_cache,
									"frame-segments-file":options.frame_segments_file,
									"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
									"blind-injections":options.blind_injections,
									"veto-segments-file":options.vetoes,
								},
							input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)},
							output_cache_files = {
									"output-cache":output_names,
									"ranking-stat-output-cache":dist_stat_names
								}
							)
					# Set a post script to check for file integrity
					if options.gzip_test:
						noninjnode.set_post_script("gzip_test.sh")
						noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names))
					# impose a priority to help with depth first submission
					noninjnode.set_priority(chunk_counter+15)
					inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)

			# process injections
			for injections in injection_files:
				# setup output names
				sim_name = sim_tag_from_inj_file(injections)

				bgbin_svd_bank_strings = [index_bank_string_tuple for i, index_bank_string_tuple in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]]

384
				for chunk_counter, bgbin_list in enumerate(dagparts.groups(bgbin_svd_bank_strings, numchunks)):
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
					bgbin_indices, svd_bank_strings = zip(*bgbin_list)
					output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices]
					output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (bgbin_index, sim_name), seg, '.xml.gz', path = output_paths[i]) for i, bgbin_index in enumerate(bgbin_indices)]
					svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))]
					try:
						reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"]
						parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices]
					except AttributeError:
						# injection-only run
						reference_psd = psd_nodes[(ifos, seg)]
						parents = []

					# Calculate the appropriate ht-gate-threshold values according to the scale given
					threshold_values = None
					if options.ht_gate_threshold_linear is not None:
						# A scale is given
						mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
						# use max mchirp in a given svd bank to decide gate threshold
						bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
						threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
					else:
						if options.ht_gate_threshold is not None:
							threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given

					# setup injection node
					injnode = dagparts.DAGNode(gstlalInspiralInjJob, dag, parent_nodes = parents,
							opts = {"psd-fft-length":options.psd_fft_length,
								"ht-gate-threshold":threshold_values,
								"frame-segments-name":options.frame_segments_name,
								"gps-start-time":int(seg[0]),
								"gps-end-time":int(seg[1]),
								"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
								"tmp-space":dagparts.condor_scratch_space(),
								"track-psd":"",
								"control-peak-time":options.control_peak_time,
								"coincidence-threshold":options.coincidence_threshold,
								"singles-threshold":options.singles_threshold,
								"fir-stride":options.fir_stride,
								"data-source":"frames",
								"local-frame-caching":"",
								"min-instruments":options.min_instruments,
								"reference-likelihood-file":options.reference_likelihood_file
								},
							input_files = {	"time-slide-file":options.inj_time_slide_file,
									"frame-cache":options.frame_cache,
									"frame-segments-file":options.frame_segments_file,
									"reference-psd":reference_psd,
									"veto-segments-file":options.vetoes,
									"injections": injections
								},
							input_cache_files = {"svd-bank-cache":svd_names},
							input_cache_file_name = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017(filename) for filename in svd_names], '.cache').replace('SVD', 'SVD_%s' % sim_name),
							output_cache_files = {
									"output-cache":output_names
								}
							)
					# Set a post script to check for file integrity
					if options.gzip_test:
						injnode.set_post_script("gzip_test.sh")
						injnode.add_post_script_arg(" ".join(output_names))
					# impose a priority to help with depth first submission
					if bgbin_chunk_map:
						injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1)
					else:
						injnode.set_priority(chunk_counter+1)
					inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)

	# Replace mchirplo:mchirphi:inj.xml with inj.xml
	options.injections = [inj.split(':')[-1] for inj in options.injections]
	return inspiral_nodes
455

456
def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
457

458 459 460 461 462
	# first get the previous output in a usable form
	lloid_output = {}
	for inj in options.injections + [None]:
		lloid_output[sim_tag_from_inj_file(inj)] = {}
	lloid_diststats = {}
463
	if options.dist_stats_cache:
464
		for ce in map(CacheEntry, open(options.dist_stats_cache)):
465
			lloid_diststats[ce.description.split("_")[0]] = [ce.path]
466 467
	for ifos in segsdict:
		for seg in segsdict[ifos]:
468
			# iterate over the mass space chunks for each segment
469 470 471
			for node in inspiral_nodes[(ifos, None)][seg]:
				if node is None:
					break
472
				len_out_files = len(node.output_files["output-cache"])
473
				for f in node.output_files["output-cache"]:
474
					# Store the output files and the node for use as a parent dependency
475
					lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node]))
476
				for f in node.output_files["ranking-stat-output-cache"]:
477
					lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f)
478
			for inj in options.injections:
479
				for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
480 481
					if injnode is None:
						continue
482
					for f in injnode.output_files["output-cache"]:
483
						# Store the output files and the node and injnode for use as a parent dependencies
484 485 486 487 488
						bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0]
						try:
							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode]))
						except KeyError:
							lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode]))
489 490 491

	return lloid_output, lloid_diststats

492
def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWithZerolagJob, calcLikelihoodJob, calcLikelihoodJobInj, lalappsRunSqliteJob, toSqliteJob, marginalizeJob, svd_nodes, inspiral_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, mass_model_add_node, mass_model_file, ref_psd):
493

494 495 496 497 498 499
	likelihood_nodes = {}
	rankpdf_nodes = []
	rankpdf_zerolag_nodes = []
	outnodes = {}
	instruments = "".join(sorted(instrument_set))
	margnodes = {}
500

501 502 503 504 505 506 507 508 509 510 511 512
	# NOTE! we rely on there being identical templates in each instrument, so we just take one of the values of the svd_nodes which are a dictionary
	one_ifo_svd_nodes = svd_nodes.values()[0]
	# Here n counts the bins
	# first non-injections, which will get skipped if this is an injections-only run
	for n, (outputs, diststats) in enumerate((lloid_output[None][key], lloid_diststats[key]) for key in sorted(lloid_output[None].keys())):
		inputs = [o[0] for o in outputs]
		parents = []
		[parents.extend(o[1]) for o in outputs]
		# FIXME we keep this here in case we someday want to have a
		# mass bin dependent prior, but it really doesn't matter for
		# the time being.
		priornode = dagparts.DAGNode(createPriorDistStatsJob, dag,
513
				parent_nodes = [one_ifo_svd_nodes[n]] + mass_model_add_node or [],
514
				opts = {"instrument":instrument_set, "background-prior":1, "min-instruments":options.min_instruments, "df": "bandwidth", "coincidence-threshold":options.coincidence_threshold},
515
				input_files = {"svd-file":one_ifo_svd_nodes[n].output_files["write-svd"], "mass-model-file":mass_model_file, "dtdphi-file":svd_to_dtdphi_map[n], "psd-xml": ref_psd},
516 517 518 519 520 521 522 523 524 525 526 527
				output_files = {"write-likelihood":dagparts.T050017_filename(instruments, '%04d_CREATE_PRIOR_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = createPriorDistStatsJob.output_path)}
			)
		# Create a file that has the priors *and* all of the diststats
		# for a given bin marginalized over time. This is all that will
		# be needed to compute the likelihood
		diststats_per_bin_node = dagparts.DAGNode(marginalizeJob, dag,
			parent_nodes = [priornode] + parents,
			opts = {"marginalize":"ranking-stat"},
			input_cache_files = {"likelihood-cache":diststats + [priornode.output_files["write-likelihood"]]},
			output_files = {"output":dagparts.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = marginalizeJob.output_path)},
			input_cache_file_name = dagparts.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.cache')
			)
528

529 530 531 532 533 534
		calcranknode = dagparts.DAGNode(calcRankPDFsJob, dag,
				parent_nodes = [diststats_per_bin_node],
				opts = {"ranking-stat-samples":options.ranking_stat_samples},
				input_files = {"":diststats_per_bin_node.output_files["output"]},
				output_files = {"output":dagparts.T050017_filename(instruments, '%04d_CALC_RANK_PDFS' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsJob.output_path)}
			)
535

536 537 538 539 540 541
		calcrankzerolagnode = dagparts.DAGNode(calcRankPDFsWithZerolagJob, dag,
				parent_nodes = [diststats_per_bin_node],
				opts = {"add-zerolag-to-background":"","ranking-stat-samples":options.ranking_stat_samples},
				input_files = {"":diststats_per_bin_node.output_files["output"]},
				output_files = {"output":dagparts.T050017_filename(instruments, '%04d_CALC_RANK_PDFS_WZL' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsWithZerolagJob.output_path)}
			)
542

543 544 545 546 547 548 549 550 551 552 553
		margnodes['%04d' %(n,)] = diststats_per_bin_node
		rankpdf_nodes.append(calcranknode)
		rankpdf_zerolag_nodes.append(calcrankzerolagnode)
		
		# Break up the likelihood jobs into chunks to process fewer files, e.g, 16
		likelihood_nodes.setdefault(None,[]).append(
			[dagparts.DAGNode(calcLikelihoodJob, dag,
				parent_nodes = [diststats_per_bin_node],
				opts = {"tmp-space":dagparts.condor_scratch_space()},
				input_files = {"likelihood-url":diststats_per_bin_node.output_files["output"]},
				input_cache_files = {"input-cache":chunked_inputs}
554
				) for chunked_inputs in dagparts.groups(inputs, 100)]
555
			)
556

557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
	# then injections
	for inj in options.injections:
		for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[sim_tag_from_inj_file(inj)][key], lloid_diststats[key], key) for key in sorted(lloid_output[sim_tag_from_inj_file(inj)].keys())):
			if outputs is None:
				continue
			inputs = [o[0] for o in outputs]
			parents = []
			[parents.extend(o[1]) for o in outputs]
			if margnodes:
				parents.append(margnodes[bgbin_index])
				likelihood_url = margnodes[bgbin_index].output_files["output"]
			else:
				likelihood_url = diststats[0]
			# Break up the likelihood jobs into chunks to process fewer files, e.g., 16
			likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append(
				[dagparts.DAGNode(calcLikelihoodJobInj, dag,
					parent_nodes = parents,
					opts = {"tmp-space":dagparts.condor_scratch_space()},
					input_files = {"likelihood-url":likelihood_url},
					input_cache_files = {"input-cache":chunked_inputs}
577
					) for chunked_inputs in dagparts.groups(inputs, 100)]
578
				)
579

580 581
	
	# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
582
	files_to_group = 100
583 584
	for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()):
		# Flatten the nodes for this sub bank
585
		nodes = dagparts.flatten(nodes)
586 587
		merge_nodes = []
		# Flatten the input/output files from calc_likelihood
588
		inputs = dagparts.flatten([node.input_files["input-cache"] for node in nodes])
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
		if inj is None:
			# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
			for n in range(0, len(inputs), files_to_group):
				merge_nodes.append(dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = nodes,
					opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
					input_files = {"":inputs[n:n+files_to_group]}
					)
				)
				if options.copy_raw_results:
					merge_nodes[-1].set_pre_script("store_raw.sh")
					merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))

			# Merging all the dbs from the same sub bank
			for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
				db = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite') 
				db = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(db).description[:4]]), db)
				sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = merge_nodes,
					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
					input_cache_files = {"input-cache":inputs},
					output_files = {"database":db},
					input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
				)
				sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
					input_files = {"":db}
				)
				outnodes.setdefault(None, []).append(sqlitenode)
		else:
			# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
			for n in range(0, len(inputs), files_to_group):
				merge_nodes.append(dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = nodes,
					opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
					input_files = {"":inputs[n:n+files_to_group]}
					)
				)
				if options.copy_raw_results:
					merge_nodes[-1].set_pre_script("store_raw.sh")
					merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))

			# Merging all the dbs from the same sub bank and injection run
			for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
				injdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite')
				injdb = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(injdb).description[:4]]), injdb)
				sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = merge_nodes,
					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
					input_cache_files = {"input-cache":inputs},
					output_files = {"database":injdb},
					input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
				)
				sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
					input_files = {"":injdb}
				)
				outnodes.setdefault(sim_tag_from_inj_file(inj), []).append(sqlitenode)

	# make sure outnodes has a None key, even if its value is an empty list
	outnodes.setdefault(None, [])

	return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes

def merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options):
	rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
	rankpdf_zerolag_nodes = []
	outnodes = {}
	if options.num_files_per_background_bin == 1:
		bgbin_lloid_map = {}
		# Get list of all files for each background bin (will be same length for each bin)
		for ce in map(CacheEntry, open(options.lloid_cache)):
			bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)

		if len(bgbin_lloid_map.values()[0]) == 1:
			# Starting with 1:1 mapping between files and bins,
			# thus no merging is needed yet
			outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))]
			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
				outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)]

		else:
			for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
				noninjdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
				# merge all of the dbs from the same subbank
				sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [],
					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
					input_cache_files = {"input-cache":dbs},
					output_files = {"database":noninjdb},
					input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
				)

				sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
					input_files = {"":noninjdb}
				)

				outnodes.setdefault(None, []).append(sqlitenode)
			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
				bgbin_lloid_map = {}
				for ce in map(CacheEntry, open(inj_lloid_cache)):
					bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)

				for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
					injdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
					# merge all of the dbs from the same subbank
					sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [],
						opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
						input_cache_files = {"input-cache":dbs},
						output_files = {"database":injdb},
						input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
					)

					sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
						opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
						input_files = {"":injdb}
					)

					outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)

	else:
		# Starting with output of analysis before naming convention
		# update (commit 5efd78fee6b371c999f510d07be33ec64f385695),
		# so all lloid files contain gps times for entire run, and are
		# numbered by iterating through segments in a given bin first
		# (e.g. files 0000 to 0009 may all belong to bin 0000, then
		# files 0010 to 0019 would all belong to bin 0001, etc)
712
		for ce_list in dagparts.groups(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin):
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
			hi_index = ce_list[-1].description.split('_')[0]
			noninjdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))

			# merge all of the dbs from the same subbank
			sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [],
				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
				input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
				output_files = {"database":noninjdb},
				input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
			)

			sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
				input_files = {"":noninjdb}
			)

			outnodes.setdefault(None, []).append(sqlitenode)

		for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
732
			for ce_list in dagparts.groups(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin):
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
				hi_index = ce_list[-1].description.split('_')[0]
				injdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))

				# merge all of the dbs from the same subbank
				sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [],
					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
					input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
					output_files = {"database":injdb},
					input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
				)

				sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
					opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
					input_files = {"":injdb}
				)

				outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)

	return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes

def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSqliteJob, toSqliteNoCacheJob, cpJob, innodes, ligolw_add_nodes, options, instruments):

755
	num_chunks = 100
756 757 758 759 760 761 762 763 764

	if options.vetoes is None:
		vetoes = []
	else:
		vetoes = [options.vetoes]

	chunk_nodes = []
	dbs_to_delete = []
	# Process the chirp mass bins in chunks to paralellize the merging process
765
	for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
		try:
			dbs = [node.input_files[""] for node in nodes]
			parents = nodes

		except AttributeError:
			# analysis started at merger step but seeded by lloid files which
			# have already been merged into one file per background
			# bin, thus the analysis will begin at this point
			dbs = nodes
			parents = []

		# Merge the final non injection database into chunks
		noninjdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
		sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = parents,
			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
			input_cache_files = {"input-cache": dbs},
			output_files = {"database":noninjdb},
			input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
		)

		# cluster the final non injection database
		noninjsqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
			opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
			input_files = {"":noninjdb}
		)
		chunk_nodes.append(noninjsqlitenode)
		dbs_to_delete.append(noninjdb)

	# Merge the final non injection database
	outnodes = []
	injdbs = []
	if options.non_injection_db:
		# injection-only run
		noninjdb = options.non_injection_db

	else:
		final_nodes = []
803
		for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
			noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
			sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = nodes,
				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
				input_cache_files = {"input-cache": [node.input_files[""] for node in nodes]},
				output_files = {"database":noninjdb},
				input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
			)

			# cluster the final non injection database
			noninjsqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
				input_files = {"":noninjdb}
			)
			final_nodes.append(noninjsqlitenode)

		noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
		sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = final_nodes,
			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
			input_files = {"": (vetoes + [options.frame_segments_file])},
			input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]},
			output_files = {"database":noninjdb},
			input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
		)

		# cluster the final non injection database
		noninjsqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
			opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
			input_files = {"":noninjdb}
		)

		cpnode = dagparts.DAGNode(cpJob, dag, parent_nodes = [noninjsqlitenode],
			input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
		)

		outnodes.append(cpnode)

	# FIXME far-injections currently doesnt work, either fix it or delete it
	#for injections, far_injections in zip(options.injections, options.far_injections):
	if options.injections:
		iterable_injections = options.injections
	else:
		iterable_injections = options.injections_for_merger

	for injections in iterable_injections:
		# extract only the nodes that were used for injections
		chunk_nodes = []

851
		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
			try:
				dbs = [injnode.input_files[""] for injnode in injnodes]
				parents = injnodes
			except AttributeError:
				dbs = injnodes
				parents = []

			# Setup the final output names, etc.
			injdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)


			# merge
			sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = parents,
				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
				input_cache_files = {"input-cache":dbs},
				output_files = {"database":injdb},
				input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
			)

			# cluster
			clusternode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
				input_files = {"":injdb}
			)

			chunk_nodes.append(clusternode)
			dbs_to_delete.append(injdb)


		final_nodes = []
882
		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042
			# Setup the final output names, etc.
			injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')

			# merge
			sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = injnodes,
				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
				input_cache_files = {"input-cache": [node.input_files[""] for node in injnodes]},
				output_files = {"database":injdb},
				input_cache_file_name = injdb.replace('.sqlite','.cache')
			)

			# cluster
			clusternode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
				input_files = {"":injdb}
			)
			final_nodes.append(clusternode)

		# Setup the final output names, etc.
		injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
		injdbs.append(injdb)
		injxml = injdb.replace('.sqlite','.xml.gz')

		# FIXME far-injections currently doesnt work, either fix it or delete it
		'''
		# If there are injections that are too far away to be seen in a separate file, add them now. 
		if far_injections is not None:
			xml_input = [injxml] + [far_injections]
		else:
			xml_input = injxml
		'''
		xml_input = injxml

		# merge
		sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = final_nodes + ligolw_add_nodes,
			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
			input_files = {"": (vetoes + [options.frame_segments_file, injections])},
			input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]},
			output_files = {"database":injdb},
			input_cache_file_name = injdb.replace('.sqlite','.cache')
		)

		# cluster
		clusternode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
			opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
			input_files = {"":injdb}
		)


		clusternode = dagparts.DAGNode(toXMLJob, dag, parent_nodes = [clusternode],
			opts = {"tmp-space":dagparts.condor_scratch_space()},
			output_files = {"extract":injxml},
			input_files = {"database":injdb}
		)

		inspinjnode = dagparts.DAGNode(ligolwInspinjFindJob, dag, parent_nodes = [clusternode],
			opts = {"time-window":0.9},
			input_files = {"":injxml}
		)

		sqlitenode = dagparts.DAGNode(toSqliteNoCacheJob, dag, parent_nodes = [inspinjnode],
			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
			output_files = {"database":injdb},
			input_files = {"":xml_input}
		)

		cpnode = dagparts.DAGNode(cpJob, dag, parent_nodes = [sqlitenode],
			input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
		)
			
		outnodes.append(cpnode)

	return injdbs, noninjdb, outnodes, dbs_to_delete

def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, rankpdf_nodes, rankpdf_zerolag_nodes, injdbs, noninjdb, final_sqlite_nodes):
	# compute FAPs and FARs
	# split up the marginilization into groups of 10
	try:
		margin = [node.output_files["output"] for node in rankpdf_nodes]
		parents = rankpdf_nodes
		margin_zerolag =  [node.output_files["output"] for node in rankpdf_zerolag_nodes]
		parents_zerolag = rankpdf_zerolag_nodes
	except AttributeError:
		# analysis started at merger step
		margin = rankpdf_nodes
		parents = []
		margin_zerolag = rankpdf_zerolag_nodes
		parents_zerolag = []
	margout = []
	margzerolagout = []
	margnodes = []
	margzerolagnodes = []
	margnum = 16
	for i,n in enumerate(range(0, len(margin), margnum)):
		margout.append(dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin[n:n+margnum]], '.xml.gz', path = marginalizeJob.output_path))
		margnodes.append(dagparts.DAGNode(marginalizeJob, dag, parent_nodes = parents,
			opts = {"marginalize":"ranking-stat-pdf"},
			output_files = {"output":margout[-1]}, 
			input_cache_files = {"likelihood-cache":margin[n:n+margnum]},
			input_cache_file_name = os.path.basename(margout[-1]).replace('.xml.gz','.cache')
		))

		if rankpdf_zerolag_nodes:
			margzerolagout.append(dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin_zerolag[n:n+margnum]], '.xml.gz', path = marginalizeWithZerolagJob.output_path))
			margzerolagnodes.append(dagparts.DAGNode(marginalizeWithZerolagJob, dag, parent_nodes = parents_zerolag,
				opts = {"marginalize":"ranking-stat-pdf"},
				output_files = {"output":margzerolagout[-1]},
				input_cache_files = {"likelihood-cache":margin_zerolag[n:n+margnum]},
				input_cache_file_name = os.path.basename(margzerolagout[-1]).replace('.xml.gz','.cache')
			))


	if options.marginalized_likelihood_file:
		# injection-only run
		parents = final_sqlite_nodes
		parents_zerolag = final_sqlite_nodes
		marginalized_likelihood_file = options.marginalized_likelihood_file
		marginalized_likelihood_with_zerolag_file = options.marginalized_likelihood_with_zerolag_file

	else:
		margnode = dagparts.DAGNode(marginalizeJob, dag, parent_nodes = margnodes,
			opts = {"marginalize":"ranking-stat-pdf"},
			output_files = {"output":"marginalized_likelihood.xml.gz"},
			input_cache_files = {"likelihood-cache":margout},
			input_cache_file_name = "marginalized_likelihood.cache"
		)
		parents = [margnode] + final_sqlite_nodes
		marginalized_likelihood_file = margnode.output_files["output"]

		margnode = dagparts.DAGNode(marginalizeWithZerolagJob, dag, parent_nodes = margzerolagnodes,
			opts = {"marginalize":"ranking-stat-pdf"},
			output_files = {"output":"marginalized_likelihood_with_zerolag.xml.gz"},
			input_cache_files = {"likelihood-cache":margzerolagout},
			input_cache_file_name = "marginalized_likelihood_with_zerolag.cache"
		)
		parents_zerolag = [margnode] + final_sqlite_nodes
		marginalized_likelihood_with_zerolag_file = margnode.output_files["output"]
	
	
	farnode = dagparts.DAGNode(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents,
		opts = {"tmp-space":dagparts.condor_scratch_space()},
		input_files = {"background-bins-file":marginalized_likelihood_file, "injection-db":injdbs, "non-injection-db":noninjdb}
	)
	
	dagparts.DAGNode(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents_zerolag,
		opts = {"tmp-space":dagparts.condor_scratch_space()},
		input_files = {"background-bins-file":marginalized_likelihood_with_zerolag_file, "injection-db":[injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs], "non-injection-db":noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL')}
	)

	return farnode, margout + margzerolagout

def parse_command_line():
	parser = OptionParser(description = __doc__)

	# generic data source options
	datasource.append_options(parser)
	parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s.  Note that 50% will be used for zero-padding.")

	# reference_psd
	parser.add_option("--reference-psd", help = "Don't measure PSDs, use this one instead")
1043

1044 1045
	# Template bank
	parser.add_option("--template-bank", metavar = "filename", help = "Set the template bank xml file.")
1046
	parser.add_option("--mass-model", metavar = "filename", help = "Set the name of the mass model. Options are 'narrow-bns', 'broad-bns', 'bbh', 'ligo', 'detected-logm', 'uniform-template', or 'file'")
1047
	parser.add_option("--mass-model-file", metavar = "filename", help = "Set the name of the mass model file, e.g., mass_model.h5.  Required if --mass-model=file")
1048

1049 1050 1051
	# dtdphi option
	parser.add_option("--dtdphi-file", metavar = "filename", action = "append", help = "dtdphi snr ratio pdfs to read from (hdf5 format)")

1052 1053 1054 1055 1056 1057 1058 1059 1060
	# SVD bank construction options
	parser.add_option("--overlap", metavar = "num", type = "int", action = "append", help = "set the factor that describes the overlap of the sub banks, must be even!")
	parser.add_option("--autocorrelation-length", type = "int", default = 201, help = "The minimum number of samples to use for auto-chisquared, default 201 should be odd")
	parser.add_option("--samples-min", type = "int", action = "append", help = "The minimum number of samples to use for time slices default 1024 (can be given multiple times, one for each time --bank-cache is invoked)")
	parser.add_option("--samples-max-256", type = "int", default = 1024, help = "The maximum number of samples to use for time slices with frequencies above 256Hz, default 1024")
	parser.add_option("--samples-max-64", type = "int", default = 2048, help = "The maximum number of samples to use for time slices with frequencies above 64Hz, default 2048")
	parser.add_option("--samples-max", type = "int", default = 4096, help = "The maximum number of samples to use for time slices with frequencies below 64Hz, default 4096")
	parser.add_option("--bank-cache", metavar = "filenames", action = "append", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc.. (can be given multiple times)")
	parser.add_option("--tolerance", metavar = "float", type = "float", default = 0.9999, help = "set the SVD tolerance, default 0.9999")
1061
	parser.add_option("--flow", metavar = "num", type = "float", action = "append", help = "set the low frequency cutoff. Can be given multiple times.")
1062 1063 1064
        parser.add_option("--fmax", metavar = "num", type = "float", default = 1600, help = "set the max frequency cutoff, default 1600 (Hz)")
	parser.add_option("--sample-rate", metavar = "Hz", type = "int", help = "Set the sample rate.  If not set, the sample rate will be based on the template frequency.  The sample rate must be at least twice the highest frequency in the templates. If provided it must be a power of two")
	parser.add_option("--identity-transform", action = "store_true", help = "Use identity transform, i.e. no SVD")
1065
	
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076
	# trigger generation options
	parser.add_option("--vetoes", metavar = "filename", help = "Set the veto xml file.")
	parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
	parser.add_option("--inj-time-slide-file", metavar = "filename", help = "Set the time slide table xml file for injections")
	parser.add_option("--web-dir", metavar = "directory", help = "Set the web directory like /home/USER/public_html")
	parser.add_option("--fir-stride", type="int", metavar = "secs", default = 8, help = "Set the duration of the fft output blocks, default 8")
	parser.add_option("--control-peak-time", type="int", default = 8, metavar = "secs", help = "Set the peak finding time for the control signal, default 8")
	parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005).  The light-travel time between instruments will be added automatically in the coincidence test.")
	parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
	parser.add_option("--reference-likelihood-file", metavar = "file", help = "Set a reference likelihood file to compute initial likelihood ratios. Required")
	parser.add_option("--num-banks", metavar = "str", help = "The number of parallel subbanks per gstlal_inspiral job. can be given as a list like 1,2,3,4 then it will split up the bank cache into N groups with M banks each.")
1077
	parser.add_option("--max-inspiral-jobs", type="int", metavar = "jobs", help = "Set the maximum number of gstlal_inspiral jobs to run simultaneously, default no constraint.")
1078 1079
	parser.add_option("--ht-gate-threshold", type="float", help="set a threshold on whitened h(t) to veto glitches")
	parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
1080
	parser.add_option("--inspiral-executable", default = "gstlal_inspiral", help = "Options gstlal_inspiral | gstlal_iir_inspiral, default gstlal_inspiral")
1081 1082 1083 1084 1085 1086 1087 1088 1089
	parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral table or otherwise processing the data differently.  Has the effect of having hidden signals in the input data. Separate injection runs using the --injections option will still occur.")
	# FIXME far-injections currently doesnt work, either fix it or delete it
	#parser.add_option("--far-injections", action = "append", help = "Injection files with injections too far away to be seen and are not filtered. Required. See https://www.lsc-group.phys.uwm.edu/ligovirgo/cbcnote/NSBH/MdcInjections/MDC1 for example.")
	parser.add_option("--singles-threshold", default=float("inf"), action = "store", metavar="THRESH", help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
	parser.add_option("--copy-raw-results", default=False, action = "store_true", help = "Copy raw gstlal_inspiral results before applying clustering and other lossy operations.")
	parser.add_option("--gzip-test", default=False, action = "store_true", help = "Perform gzip --test on all output files.")
	parser.add_option("--verbose", action = "store_true", help = "Be verbose")
	parser.add_option("--disable-calc-inj-snr", default=False, action = "store_true", help = "Disable injection SNR calculation")
	parser.add_option("--ranking-stat-samples", metavar = "N", default = 2**24, type = "int", help = "Construct ranking statistic histograms by drawing this many samples from the ranking statistic generator (default = 2^24).")
1090

1091 1092 1093
	# Override the datasource injection option
	parser.remove_option("--injections")
	parser.add_option("--injections", action = "append", help = "append injection files to analyze. Must prepend filename with X:Y:, where X and Y are floats, e.g. 1.2:3.1:filename, so that the injections are only searched for in regions of the template bank with X <= chirp mass < Y.")
1094

1095 1096 1097 1098 1099 1100 1101
	# Data from a zero lag run in the case of an injection-only run.
	parser.add_option("--dist-stats-cache", metavar = "filename", help = "Set the cache file for dist stats (required iff running injection-only analysis)")
	parser.add_option("--svd-bank-cache", metavar = "filename", help = "Set the cache file for svd banks (required iff running injection-only analysis)")
	parser.add_option("--psd-cache", metavar = "filename", help = "Set the cache file for psd (required iff running injection-only analysis)")
	parser.add_option("--non-injection-db", metavar = "filename", help = "Set the non injection data base file (required iff running injection-only analysis)")
	parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)")
	parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)")
1102

1103 1104 1105 1106 1107 1108
	# Data from a previous run in the case of a run that starts at the merger step
	parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
	parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
	parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
	parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions")
	parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")
1109

1110 1111 1112 1113
	# Condor commands
	parser.add_option("--request-cpu", default = "4", metavar = "integer", help = "set the inspiral CPU count, default = 4")
	parser.add_option("--request-memory", default = "7GB", metavar = "integer", help = "set the inspiral memory, default = 7GB")
	parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
1114

1115
	options, filenames = parser.parse_args()
1116

1117 1118 1119 1120
	if options.template_bank:
		bank_xmldoc = ligolw_utils.load_filename(options.template_bank, verbose = options.verbose, contenthandler = LIGOLWContentHandler)
		sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(bank_xmldoc)
		assert len(sngl_inspiral_table) == len(set([r.template_id for r in sngl_inspiral_table])), "Template bank ids are not unique"
1121

1122 1123
	if options.mass_model not in ("narrow-bns", "broad-bns", "bbh", "ligo", "detected-logm", "uniform-template", "file"):
		raise ValueError("--mass-model must be 'narrow-bns', 'broad-bns', 'bbh', 'ligo', 'detected-logm', 'uniform-template', or 'file'")
1124 1125
	if options.mass_model == "file" and not options.mass_model_file:
		raise ValueError("--mass-model-file must be provided if --mass-model=file")
1126

1127
	if not options.dtdphi_file:
1128
		options.dtdphi_file = [os.path.join(gstlal_config_paths["pkgdatadir"], "inspiral_dtdphi_pdf.h5")]*len(options.overlap)
1129 1130 1131 1132

	if len(options.dtdphi_file) != len(options.overlap):
		raise ValueError("You must provide as many dtdphi files as banks")

1133 1134
	if options.num_banks:
		options.num_banks = [int(v) for v in options.num_banks.split(",")]
1135

1136 1137
	if options.sample_rate is not None and (not numpy.log2(options.sample_rate) == int(numpy.log2(options.sample_rate))):
		raise ValueError("--sample-rate must be a power of two")
1138

1139 1140
	if not options.samples_min and not options.svd_bank_cache:
		options.samples_min = [1024]*len(options.bank_cache)
1141

1142 1143
	if not options.overlap and not options.svd_bank_cache:
		options.overlap = [0]*len(options.bank_cache)
1144
	
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
	if not options.svd_bank_cache and any(overlap % 2 for overlap in options.overlap):
		raise ValueError("overlap must be even")

	if not options.svd_bank_cache and not (len(options.samples_min) == len(options.bank_cache) == len(options.overlap)):
		raise ValueError("must provide same number of inputs for --samples-min, --bank-cache, --overlap")

	missing_injection_options = []
	for option in ("dist_stats_cache", "svd_bank_cache", "psd_cache", "non_injection_db", "marginalized_likelihood_file", "marginalized_likelihood_with_zerolag_file"):
		if getattr(options, option) is None:
			missing_injection_options.append(option)
	if len(missing_injection_options) > 0 and len(missing_injection_options) < 6:
		raise ValueError("missing injection-only options %s." % ", ".join([option for option in missing_injection_options]))
	if len(missing_injection_options) == 0 and options.num_banks:
		raise ValueError("cant specify --num-banks in injection-only run")

	missing_merger_options = []
	for option in ("lloid_cache", "inj_lloid_cache", "rank_pdf_cache"):
		if getattr(options, option) is None:
			missing_merger_options.append(option)
	if len(missing_injection_options) > 0 and len(missing_injection_options) < 3:
		raise ValueError("missing merger-run options %s." % ", ".join([option for option in missing_injection_options]))
	if len(missing_injection_options) == 0 and options.num_banks:
		raise ValueError("cant specify --num-banks in a run starting at the merger step")

1169

1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
	fail = ""
	required_options = []
	if len(missing_merger_options) == 3:
		required_options.append("reference_likelihood_file")
		if len(missing_injection_options) == 6:
			required_options.append("bank_cache")

	for option in required_options:
		if getattr(options, option) is None:
			fail += "must provide option %s\n" % (option)
	if fail: raise ValueError, fail

	# FIXME far-injections currently doesnt work, either fix it or delete it
1183 1184 1185 1186 1187 1188 1189
	'''
	if options.far_injections is not None and len(options.injections) != len(options.far_injections):
		raise ValueError("number of injection files and far injection files must be equal")
	if options.far_injections is None:
		options.far_injections = [None for inj in options.injections]
	'''

1190 1191 1192 1193 1194 1195 1196 1197 1198 1199

	#FIXME a hack to find the sql paths
	share_path = os.path.split(dagparts.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal')
	options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql')
	options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql')
	options.injection_proc_sql_file = os.path.join(share_path, 'simplify_proc_table_in_inj_file.sql')

	return options, filenames


1200 1201