From 0e898267204b431dff6bacd9b002afbc378cb329 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Mon, 26 Aug 2019 07:37:33 -0700
Subject: [PATCH] gstlal_inspiral_dag: revert f21a0269, f3ee4be4, 5dc20a30
 associated with DAG reorg

---
 gstlal-inspiral/bin/gstlal_inspiral_dag | 521 ++++++++++++++----------
 1 file changed, 314 insertions(+), 207 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_inspiral_dag b/gstlal-inspiral/bin/gstlal_inspiral_dag
index eed28595da..e894ab5f35 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral_dag
+++ b/gstlal-inspiral/bin/gstlal_inspiral_dag
@@ -263,6 +263,7 @@ def parse_command_line():
 	parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
 	parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral table or otherwise processing the data differently.  Has the effect of having hidden signals in the input data. Separate injection runs using the --injections option will still occur.")
 	parser.add_option("--singles-threshold", default=float("inf"), action = "store", metavar="THRESH", help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
+	parser.add_option("--copy-raw-results", default=False, action = "store_true", help = "Copy raw gstlal_inspiral results before applying clustering and other lossy operations.")
 	parser.add_option("--gzip-test", default=False, action = "store_true", help = "Perform gzip --test on all output files.")
 	parser.add_option("--verbose", action = "store_true", help = "Be verbose")
 	parser.add_option("--disable-calc-inj-snr", default=False, action = "store_true", help = "Disable injection SNR calculation")
@@ -280,11 +281,11 @@ def parse_command_line():
 	parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)")
 	parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)")
 
-	# Data from a previous run in the case of a run that starts at the post-processing step
-	parser.add_option("--postprocess-only", action = "store_true", default=False, help = "Start the analysis at the post-processing step.")
+	# Data from a previous run in the case of a run that starts at the merger step
 	parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
 	parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
 	parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
+	parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions")
 	parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")
 
 	# Condor commands
@@ -355,8 +356,6 @@ def parse_command_line():
 
 	#FIXME a hack to find the sql paths
 	share_path = os.path.split(dagparts.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal')
-	options.snr_cluster_sql_file = os.path.join(share_path, 'snr_simplify_and_cluster.sql')
-	options.inj_snr_cluster_sql_file = os.path.join(share_path, 'inj_snr_simplify_and_cluster.sql')
 	options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql')
 	options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql')
 	options.injection_proc_sql_file = os.path.join(share_path, 'simplify_proc_table_in_inj_file.sql')
@@ -387,6 +386,12 @@ def inputs_to_db(jobs, inputs):
 	db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
 	return os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
 
+def cache_to_db(cache, jobs):
+	hi_index = cache[-1].description.split('_')[0]
+	db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path))
+	db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
+	return db
+
 def get_rank_file(instruments, boundary_seg, n, basename, job=None):
 	if job:
 		return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path)
@@ -400,7 +405,6 @@ def set_up_jobs(options):
 	base_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})
 	ref_psd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})
 	calc_rank_pdf_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"})
-	calc_likelihood_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"24GB", "want_graceful_removal":"True", "kill_sig":"15"})
 	svd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"})
 	inj_snr_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})
 	sh_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})
@@ -417,7 +421,7 @@ def set_up_jobs(options):
 		jobs['createPriorDistStats'] = None
 		jobs['calcRankPDFs'] = None
 		jobs['calcRankPDFsWithZerolag'] = None
-		jobs['calcLikelihoodByBin'] = None
+		jobs['calcLikelihood'] = None
 		jobs['marginalize'] = None
 		jobs['marginalizeWithZerolag'] = None
 
@@ -439,7 +443,7 @@ def set_up_jobs(options):
 		jobs['createPriorDistStats'] = dagparts.DAGJob("gstlal_inspiral_create_prior_diststats", condor_commands = base_condor_commands)
 		jobs['calcRankPDFs'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", condor_commands = calc_rank_pdf_condor_commands)
 		jobs['calcRankPDFsWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", tag_base="gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands=calc_rank_pdf_condor_commands)
-		jobs['calcLikelihoodByBin'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", condor_commands = calc_likelihood_condor_commands)
+		jobs['calcLikelihood'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", condor_commands = base_condor_commands)
 		jobs['marginalize'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands = base_condor_commands)
 		jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands)
 
@@ -448,8 +452,7 @@ def set_up_jobs(options):
 	jobs['injSplitter'] = dagparts.DAGJob("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = base_condor_commands)
 	jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands)
 	jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands)
-	jobs['mergeAndReduce'] = dagparts.DAGJob("gstlal_inspiral_merge_and_reduce", condor_commands = base_condor_commands)
-	jobs['calcLikelihoodByBinInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", tag_base='gstlal_inspiral_calc_likelihood_by_bin_inj', condor_commands=calc_likelihood_condor_commands)
+	jobs['calcLikelihoodInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands=base_condor_commands)
 	jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands)
 	jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands)
 	jobs['toSqlite'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = base_condor_commands)
@@ -780,7 +783,7 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa
 	# NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
 	lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)
 
-	return lloid_output, lloid_diststats
+	return inspiral_nodes, lloid_output, lloid_diststats
 
 def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs):
 	ligolw_add_nodes = []
@@ -900,14 +903,18 @@ def summary_plot_layer(dag, jobs, parent_nodes, options):
 
 	return plotnodes
 
-def clean_intermediate_products_layer(dag, jobs, nodes, files_to_delete):
-	"""clean intermediate products
+def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete):
+	"""clean intermediate merger products
 	"""
-	for file_ in files_to_delete:
-		dagparts.DAGNode(jobs['rm'], dag, parent_nodes = nodes,
-			input_files = {"": file_}
+	for db in dbs_to_delete:
+		dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
+			input_files = {"": db}
 		)
 
+	for margfile in margfiles_to_delete:
+		dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
+			input_files = {"": margfile}
+		)
 	return None
 
 def inj_psd_layer(segsdict, options):
@@ -941,6 +948,29 @@ def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd):
 	else:
 		return [], options.mass_model_file
 
+def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None):
+	"""merge and cluster from sqlite database
+	"""
+	if input_files:
+		input_ = {"": input_files}
+	else:
+		input_ = {}
+
+	# Merge database into chunks
+	sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
+		opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
+		input_files = input_,
+		input_cache_files = {"input-cache": db_cache},
+		output_files = {"database":db},
+		input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
+	)
+
+	# cluster database
+	return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
+		opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()},
+		input_files = {"": db}
+	)
+
 def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file): 
 	instruments = "".join(sorted(instrument_set))
 	margnodes = {}
@@ -1007,200 +1037,282 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument
 
 	return rankpdf_nodes, rankpdf_zerolag_nodes
 
-def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set):
+def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set):
 	likelihood_nodes = {}
 	instruments = "".join(sorted(instrument_set))
-	chunk_size = 128
-
-	# convert final db to ligolw
-	noninjxml = noninjdb.replace('.sqlite','.xml.gz')
-	dbnode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = final_sqlite_nodes[None],
-		opts = {"tmp-space": dagparts.condor_scratch_space()},
-		output_files = {"extract": noninjxml},
-		input_files = {"database": noninjdb}
-	)
+	chunk_size = 16
 
-	parent_node = dbnode
-	for bgbin_indices in dagparts.groups(sorted(lloid_output[None].keys()), chunk_size):
-		parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag,
-			parent_nodes = [parent_node] + [marg_nodes[idx] for idx in bgbin_indices],
-			opts = {"tmp-space": dagparts.condor_scratch_space()},
-			input_files = {
-				"likelihood-url": [marg_nodes[idx].output_files["output"] for idx in bgbin_indices],
-				"": dbnode.output_files["extract"],
-				"force": "",
-			}
-		)
-	likelihood_nodes[None] = [parent_node]
+	bgbin_indices = sorted(lloid_output[None].keys())
+	for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices):
+		rankfile = functools.partial(get_rank_file, instruments, boundary_seg, '%04d'%n)
+		inputs = [o[0] for o in outputs]
+
+		# Break up the likelihood jobs into chunks to process fewer files, e.g, 16
+		likelihood_nodes.setdefault(None,[]).append(
+			[dagparts.DAGNode(jobs['calcLikelihood'], dag,
+				parent_nodes = [marg_nodes[bgbin_index]],
+				opts = {"tmp-space": dagparts.condor_scratch_space()},
+				input_files = {"likelihood-url": marg_nodes[bgbin_index].output_files["output"]},
+				input_cache_files = {"input-cache": chunked_inputs}
+				) for chunked_inputs in dagparts.groups(inputs, chunk_size)]
+			)
 
 	# then injections
-	for inj, injdb in zip(options.injections, injdbs):
+	for inj in options.injections:
 		lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)]
+		bgbin_indices = sorted(lloid_nodes.keys())
+		for n, (outputs, diststats, bgbin_index) in enumerate((lloid_nodes[key], lloid_diststats[key], key) for key in bgbin_indices):
+			if outputs is not None:
+				inputs = [o[0] for o in outputs]
+				parents = dagparts.flatten([o[1] for o in outputs])
+				if marg_nodes[bgbin_index]:
+					parents.append(marg_nodes[bgbin_index])
+					likelihood_url = marg_nodes[bgbin_index].output_files["output"]
+				else:
+					likelihood_url = diststats[0]
+
+				# Break up the likelihood jobs into chunks to process fewer files, e.g., 16
+				likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append(
+					[dagparts.DAGNode(jobs['calcLikelihoodInj'], dag,
+						parent_nodes = parents,
+						opts = {"tmp-space": dagparts.condor_scratch_space()},
+						input_files = {"likelihood-url": likelihood_url},
+						input_cache_files = {"input-cache": chunked_inputs}
+						) for chunked_inputs in dagparts.groups(inputs, chunk_size)]
+					)
 
-		# convert final db to ligolw
-		injxml = injdb.replace('.sqlite','.xml.gz')
-		injdbnode = dagparts.DAGNode(jobs['toXML'], dag,
-			parent_nodes = final_sqlite_nodes[sim_tag_from_inj_file(inj)],
-			opts = {"tmp-space": dagparts.condor_scratch_space()},
-			output_files = {"extract": injxml},
-			input_files = {"database": injdb}
-		)
+	return likelihood_nodes
 
-		parent_node = injdbnode
-		for bgbin_indices in dagparts.groups(sorted(lloid_nodes.keys()), chunk_size):
-			parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag,
-				parent_nodes = [parent_node] + [marg_nodes[idx] for idx in bgbin_indices],
-				opts = {"tmp-space": dagparts.condor_scratch_space()},
-				input_files = {
-					"likelihood-url": [marg_nodes[idx].output_files["output"] for idx in bgbin_indices],
-					"": injdbnode.output_files["extract"],
-					"force": "",
-				}
+def merge_in_bin_layer(dag, jobs, options):
+	rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
+	rankpdf_zerolag_nodes = []
+	outnodes = {}
+	if options.num_files_per_background_bin == 1:
+		bgbin_lloid_map = {}
+		# Get list of all files for each background bin (will be same length for each bin)
+		for ce in map(CacheEntry, open(options.lloid_cache)):
+			bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
+
+		if len(bgbin_lloid_map.values()[0]) == 1:
+			# Starting with 1:1 mapping between files and bins,
+			# thus no merging is needed yet
+			outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))]
+			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
+				outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)]
+
+		else:
+			for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
+				db = inputs_to_db(jobs, dbs)
+				sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file)
+				outnodes.setdefault(None, []).append(sqlitenode)
+
+			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
+				bgbin_lloid_map = {}
+				for ce in map(CacheEntry, open(inj_lloid_cache)):
+					bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
+
+				for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
+					injdb = inputs_to_db(jobs, dbs)
+					sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file)
+					outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
+
+	else:
+		# Starting with output of analysis before naming convention
+		# update (commit 5efd78fee6b371c999f510d07be33ec64f385695),
+		# so all lloid files contain gps times for entire run, and are
+		# numbered by iterating through segments in a given bin first
+		# (e.g. files 0000 to 0009 may all belong to bin 0000, then
+		# files 0010 to 0019 would all belong to bin 0001, etc)
+		for ce_list in dagparts.groups(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin):
+			noninjdb = cache_to_db(ce_list, jobs)
+			# merge all of the dbs from the same subbank
+			sqlitenode = merge_cluster_layer(dag, jobs, [], noninjdb, [ce.path for ce in ce_list], options)
+			outnodes.setdefault(None, []).append(sqlitenode)
+
+		for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
+			for ce_list in dagparts.groups(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin):
+				injdb = cache_to_db(ce_list, jobs)
+				# merge all of the dbs from the same subbank
+				sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, [ce.path for ce in ce_list], options)
+				outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
+
+	return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes
+
+def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments):
+	innodes = {}
+
+	# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
+	files_to_group = 40
+	for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()):
+		if inj is None:
+			innode_key = None
+			sql_file = options.cluster_sql_file
+		else:
+			innode_key = sim_tag_from_inj_file(inj)
+			sql_file = options.injection_sql_file
+
+		# Flatten the nodes for this sub bank
+		nodes = dagparts.flatten(nodes)
+		merge_nodes = []
+		# Flatten the input/output files from calc_likelihood
+		inputs = dagparts.flatten([node.input_files["input-cache"] for node in nodes])
+
+		# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
+		for input_files in dagparts.groups(inputs, files_to_group):
+			merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
+				opts = {"sql-file": sql_file, "tmp-space": dagparts.condor_scratch_space()},
+				input_files = {"": input_files}
+				)
 			)
-		likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node]
+			if options.copy_raw_results:
+				merge_nodes[-1].set_pre_script("store_raw.sh")
+				merge_nodes[-1].add_pre_script_arg(" ".join(input_files))
 
-	return likelihood_nodes
+		# Merging all the dbs from the same sub bank
+		for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
+			db = inputs_to_db(jobs, inputs)
+			sqlitenode = merge_cluster_layer(dag, jobs, merge_nodes, db, inputs, sql_file)
+			innodes.setdefault(innode_key, []).append(sqlitenode)
 
-def likelihood_from_cache_layer(dag, jobs, options, boundary_seg, instrument_set):
-	# Get list of all files for each background bin (will be same length for each bin)
-	bgbin_likelihood_map = {}
-	for ce in map(CacheEntry, open(options.likelihood_cache)):
-		bgbin_likelihood_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
+	# make sure outnodes has a None key, even if its value is an empty list
+	innodes.setdefault(None, [])
 
-	bgbin_db_map = {}
-	for ce in map(CacheEntry, open(options.database_cache)):
-		bgbin_db_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
+	num_chunks = 50
 
-	likelihood_nodes = {}
-	instruments = "".join(sorted(instrument_set))
-	chunk_size = 128
-
-	# convert noninjdb to ligolw
-	noninjxml = noninjdb.replace('.sqlite','.xml.gz')
-	dbnode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [],
-		opts = {"tmp-space": dagparts.condor_scratch_space()},
-		output_files = {"extract": noninjxml},
-		input_files = {"database": noninjdb}
-	)
+	if options.vetoes is None:
+		vetoes = []
+	else:
+		vetoes = [options.vetoes]
 
-	# first, non-injection jobs
-	parent_node = dbnode
-	for likelihood_urls in dagparts.groups(sorted(bgbin_likelihood_map[None].values()), chunk_size):
-		parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag,
-			parent_nodes = [parent_node],
-			opts = {"tmp-space": dagparts.condor_scratch_space()},
-			input_files = {
-				"likelihood-url": likelihood_urls,
-				"": dbnode.output_files["extract"],
-				"force": "",
-			}
-		)
-	likelihood_nodes[None] = [parent_node]
+	chunk_nodes = []
+	dbs_to_delete = []
+	# Process the chirp mass bins in chunks to paralellize the merging process
+	for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
+		try:
+			dbs = [node.input_files[""] for node in nodes]
+			parents = nodes
 
-	# then injections
-	for inj, injdb in zip(options.injections, injdbs):
-		all_likelihood_urls = bgbin_likelihood_map[sim_tag_from_inj_file(inj)].values()
+		except AttributeError:
+			# analysis started at merger step but seeded by lloid files which
+			# have already been merged into one file per background
+			# bin, thus the analysis will begin at this point
+			dbs = nodes
+			parents = []
 
-		# convert final db to ligolw
-		injxml = injdb.replace('.sqlite','.xml.gz')
-		injdbnode = dagparts.DAGNode(jobs['toXML'], dag,
-			parent_nodes = final_sqlite_nodes[sim_tag_from_inj_file(inj)],
-			opts = {"tmp-space": dagparts.condor_scratch_space()},
-			output_files = {"extract": injxml},
-			input_files = {"database": injdb}
+		dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
+		noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
+
+		# Merge and cluster the final non injection database
+		noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file)
+		chunk_nodes.append(noninjsqlitenode)
+		dbs_to_delete.append(noninjdb)
+
+	# Merge the final non injection database
+	outnodes = []
+	injdbs = []
+	if options.non_injection_db: #### injection-only run
+		noninjdb = options.non_injection_db
+	else:
+		final_nodes = []
+		for chunk, nodes in enumerate(dagparts.groups(innodes[None], 10)):
+			noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
+
+			# cluster the final non injection database
+			noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file)
+			final_nodes.append(noninjsqlitenode)
+
+		input_files = (vetoes + [options.frame_segments_file])
+		input_cache_files = [node.input_files[""] for node in final_nodes]
+		noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
+		noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
+
+		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode],
+			input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
 		)
 
-		parent_node = injdbnode
-		for likelihood_urls in dagparts.groups(sorted(all_likelihood_urls), chunk_size):
-			parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag,
-				parent_nodes = [parent_node],
-				opts = {"tmp-space": dagparts.condor_scratch_space()},
-				input_files = {
-					"likelihood-url": likelihood_urls,
-					"": injdbnode.output_files["extract"],
-					"force": "",
-				}
-			)
-		likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node]
+		outnodes.append(cpnode)
 
-	return likelihood_nodes
+	# FIXME far-injections currently doesnt work, either fix it or delete it
+	#for injections, far_injections in zip(options.injections, options.far_injections):
+	if options.injections:
+		iterable_injections = options.injections
+	else:
+		iterable_injections = options.injections_for_merger
 
-def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg):
-	"""cluster and merge lloid output into a single database
-	"""
-	sqlite_nodes = {}
-	dbs_to_delete = []
-	instruments = "".join(sorted(instrument_set))
-	chunk_size = 20
+	for injections in iterable_injections:
+		# extract only the nodes that were used for injections
+		chunk_nodes = []
 
-	# define lloid files
-	bgbin_indices = sorted(lloid_output[None].keys())
-	output_lloid = [(file_, node) for key in bgbin_indices for file_, node in lloid_output[None][key]]
-
-	### cluster and merge
-	while len(output_lloid) > 1:
-		input_lloid = output_lloid
-		output_lloid = []
-		for lloid_chunk in dagparts.groups(input_lloid, chunk_size):
-			cache, nodes = zip(*lloid_chunk)
-			nodes = list(set(dagparts.flatten(nodes)))
-
-			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in cache]
-			noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['mergeAndReduce'].output_path)
-
-			# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
-			mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
-				parent_nodes = nodes,
-				opts = {"sql-file": options.snr_cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
-				input_files = {},
-				input_cache_files = {"input-cache": cache},
-				output_files = {"database": noninjdb},
-			)
+		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
+			try:
+				dbs = [injnode.input_files[""] for injnode in injnodes]
+				parents = injnodes
+			except AttributeError:
+				dbs = injnodes
+				parents = []
 
-			dbs_to_delete.append(noninjdb)
-			output_lloid.append((noninjdb, [mergenode]))
+			# Setup the final output names, etc.
+			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
+			injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
 
-	sqlite_nodes[None] = dagparts.flatten([o[1] for o in output_lloid])
+			# merge and cluster
+			clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file)
+			chunk_nodes.append(clusternode)
+			dbs_to_delete.append(injdb)
 
-	# then injections
-	injdbs = []
-	for inj in options.injections:
-		sim_tag = sim_tag_from_inj_file(inj)
-
-		# define lloid files
-		bgbin_indices = sorted(lloid_output[sim_tag].keys())
-		output_lloid = [(file_, node) for key in bgbin_indices for file_, node in lloid_output[sim_tag][key]]
-
-		### cluster and merge
-		while len(output_lloid) > 1:
-			input_lloid = output_lloid
-			output_lloid = []
-			for lloid_chunk in dagparts.groups(input_lloid, chunk_size):
-				cache, nodes = zip(*lloid_chunk)
-				nodes = list(set(dagparts.flatten(nodes)))
-
-				dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in cache]
-				injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['mergeAndReduce'].output_path)
-
-				# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
-				mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
-					parent_nodes = nodes,
-					opts = {"sql-file": options.inj_snr_cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
-					input_files = {},
-					input_cache_files = {"input-cache": cache},
-					output_files = {"database": injdb},
-				)
 
-				dbs_to_delete.append(injdb)
-				output_lloid.append((injdb, [mergenode]))
+		final_nodes = []
+		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
+			# Setup the final output names, etc.
+			injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
 
-		sqlite_nodes[sim_tag] = dagparts.flatten([o[1] for o in output_lloid])
+			# merge and cluster
+			clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.cluster_sql_file)
+			final_nodes.append(clusternode)
+
+		# Setup the final output names, etc.
+		injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
 		injdbs.append(injdb)
+		injxml = injdb.replace('.sqlite','.xml.gz')
 
-	# remove final databases from databases considered for deletion
-	dbs_to_delete = [db for db in dbs_to_delete if db not in injdbs and db != noninjdb]
+		# FIXME far-injections currently doesnt work, either fix it or delete it
+		## If there are injections that are too far away to be seen in a separate file, add them now.
+		#if far_injections is not None:
+		#	xml_input = [injxml] + [far_injections]
+		#else:
+		#	xml_input = injxml
+		xml_input = injxml
+
+		# merge and cluster
+		parent_nodes = final_nodes + ligolw_add_nodes
+		input_files = (vetoes + [options.frame_segments_file, injections])
+		input_cache_files = [node.input_files[""] for node in final_nodes]
+		clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
+
+		clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode],
+			opts = {"tmp-space":dagparts.condor_scratch_space()},
+			output_files = {"extract":injxml},
+			input_files = {"database":injdb}
+		)
 
-	return sqlite_nodes, injdbs, noninjdb, dbs_to_delete
+		inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode],
+			opts = {"time-window":0.9},
+			input_files = {"":injxml}
+		)
+
+		sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode],
+			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
+			output_files = {"database":injdb},
+			input_files = {"":xml_input}
+		)
+
+		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode],
+			input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
+		)
+
+		outnodes.append(cpnode)
+
+	return injdbs, noninjdb, outnodes, dbs_to_delete
 
 def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
 	ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes]
@@ -1216,7 +1328,7 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
 		try:
 			margin = [node.output_files["output"] for node in nodes]
 			parents = nodes
-		except AttributeError: ### analysis started at post-processing step
+		except AttributeError: ### analysis started at merger step
 			margin = nodes
 			parents = []
 
@@ -1250,20 +1362,19 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
 
 	return final_margnodes, dagparts.flatten(all_margcache)
 
-def compute_far_layer(dag, jobs, likelihood_nodes, final_margnodes, injdbs, noninjdb):
+def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes):
 	"""compute FAPs and FARs
 	"""
-	all_likelihood_nodes = dagparts.flatten(likelihood_nodes.values())
 	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
 	filesuffixs = ['', '_with_zerolag']
 
-	for margnode, margfile, filesuffix in zip(final_margnodes, margfiles, filesuffixs):
+	for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs):
 		if options.marginalized_likelihood_file: ### injection-only run
-			parents = all_likelihood_nodes
+			parents = final_sqlite_nodes
 			marginalized_likelihood_file = margfile
 
 		else:
-			parents = [margnode] + all_likelihood_nodes
+			parents = [margnode] + final_sqlite_nodes
 			marginalized_likelihood_file = margnode.output_files["output"]
 
 		farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents,
@@ -1359,8 +1470,8 @@ if __name__ == '__main__':
 		### reference psd jobs
 		psd_nodes, ref_psd_parent_nodes = inj_psd_layer(segsdict, options)
 
-	elif options.postprocess_only:
-		# postprocess-only analysis, nothing to do here
+	elif options.lloid_cache:
+		# starting analysis at merger step, nothing to do here
 		pass
 
 	elif options.reference_psd is None:
@@ -1381,7 +1492,7 @@ if __name__ == '__main__':
 		ref_psd_parent_nodes = []
 
 	# Calculate Expected SNR jobs
-	if not options.postprocess_only and not options.disable_calc_inj_snr:
+	if not options.lloid_cache and not options.disable_calc_inj_snr:
 		ligolw_add_nodes = expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100)
 	else:
 		ligolw_add_nodes = []
@@ -1393,9 +1504,9 @@ if __name__ == '__main__':
 		# mass model
 		model_node, model_file = mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd)
 
-	if not options.postprocess_only:
+	if not options.lloid_cache:
 		# Inspiral jobs by segment
-		lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
+		inspiral_nodes, lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
 
 		# marginalize jobs
 		marg_nodes = marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file)
@@ -1403,24 +1514,22 @@ if __name__ == '__main__':
 		# calc rank PDF jobs
 		rankpdf_nodes, rankpdf_zerolag_nodes = calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
 
-		# Setup clustering and/or merging
-		sqlite_nodes, injdbs, noninjdb, dbs_to_delete = cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg)
-
-		# likelihood jobs
-		likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set)
 	else:
-		# load rankpdf cache
-		rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
-		rankpdf_zerolag_nodes = []
-
-		# likelihood jobs
-		likelihood_nodes = likelihood_from_cache_layer(dag, jobs, options, boundary_seg, instrument_set)
+		# Merge lloid files into 1 file per bin if not already 1 file per bin
+		rankpdf_nodes, rankpdf_zerolag_nodes, likelihood_nodes = merge_in_bin_layer(dag, jobs, options)
 
 	# final marginalization step
 	final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes)
 
+	# likelihood jobs
+	likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set)
+
+	# Setup clustering and/or merging
+	# after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof)
+	injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments)
+
 	# Compute FAR
-	farnode = compute_far_layer(dag, jobs, likelihood_nodes, final_marg_nodes, injdbs, noninjdb)
+	farnode = compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes)
 
 	# make summary plots
 	plotnodes = summary_plot_layer(dag, jobs, farnode, options)
@@ -1428,10 +1537,8 @@ if __name__ == '__main__':
 	# make a web page
 	summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs)
 
-	# rm intermediate margfiles/sqlite files
-	clean_intermediate_products_layer(dag, jobs, plotnodes, margfiles_to_delete)
-	clean_intermediate_products_layer(dag, jobs, plotnodes, dbs_to_delete)
-
+	# rm intermediate merger products
+	clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete)
 
 	#
 	# generate DAG files
-- 
GitLab