From f21a0269e9da31953920ae499f1782896bf60bf6 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Tue, 28 May 2019 18:12:01 -0700
Subject: [PATCH] gstlal_inspiral_dag: initial dag reorganization to aid in
 trigger post-processing

---
 gstlal-inspiral/bin/gstlal_inspiral_dag | 354 +++++++-----------------
 1 file changed, 102 insertions(+), 252 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_inspiral_dag b/gstlal-inspiral/bin/gstlal_inspiral_dag
index af09299946..7c0be6d1ea 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral_dag
+++ b/gstlal-inspiral/bin/gstlal_inspiral_dag
@@ -265,7 +265,6 @@ def parse_command_line():
 	# FIXME far-injections currently doesnt work, either fix it or delete it
 	#parser.add_option("--far-injections", action = "append", help = "Injection files with injections too far away to be seen and are not filtered. Required. See https://www.lsc-group.phys.uwm.edu/ligovirgo/cbcnote/NSBH/MdcInjections/MDC1 for example.")
 	parser.add_option("--singles-threshold", default=float("inf"), action = "store", metavar="THRESH", help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
-	parser.add_option("--copy-raw-results", default=False, action = "store_true", help = "Copy raw gstlal_inspiral results before applying clustering and other lossy operations.")
 	parser.add_option("--gzip-test", default=False, action = "store_true", help = "Perform gzip --test on all output files.")
 	parser.add_option("--verbose", action = "store_true", help = "Be verbose")
 	parser.add_option("--disable-calc-inj-snr", default=False, action = "store_true", help = "Disable injection SNR calculation")
@@ -287,7 +286,6 @@ def parse_command_line():
 	parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
 	parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
 	parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
-	parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions")
 	parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")
 
 	# Condor commands
@@ -394,12 +392,6 @@ def inputs_to_db(jobs, inputs):
 	db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
 	return os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
 
-def cache_to_db(cache, jobs):
-	hi_index = cache[-1].description.split('_')[0]
-	db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path))
-	db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
-	return db
-
 def get_rank_file(instruments, boundary_seg, n, basename, job=None):
 	if job:
 		return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path)
@@ -460,6 +452,7 @@ def set_up_jobs(options):
 	jobs['injSplitter'] = dagparts.DAGJob("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = base_condor_commands)
 	jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands)
 	jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands)
+	jobs['mergeAndReduce'] = dagparts.DAGJob("gstlal_inspiral_merge_and_reduce", condor_commands = base_condor_commands)
 	jobs['calcLikelihoodInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands=base_condor_commands)
 	jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands)
 	jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands)
@@ -791,7 +784,7 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa
 	# NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
 	lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)
 
-	return inspiral_nodes, lloid_output, lloid_diststats
+	return lloid_output, lloid_diststats
 
 def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs):
 	ligolw_add_nodes = []
@@ -956,29 +949,6 @@ def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd):
 	else:
 		return [], options.mass_model_file
 
-def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None):
-	"""merge and cluster from sqlite database
-	"""
-	if input_files:
-		input_ = {"": input_files}
-	else:
-		input_ = {}
-
-	# Merge database into chunks
-	sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
-		opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-		input_files = input_,
-		input_cache_files = {"input-cache": db_cache},
-		output_files = {"database":db},
-		input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
-	)
-
-	# cluster database
-	return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-		opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()},
-		input_files = {"": db}
-	)
-
 def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file): 
 	instruments = "".join(sorted(instrument_set))
 	margnodes = {}
@@ -1045,14 +1015,13 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument
 
 	return rankpdf_nodes, rankpdf_zerolag_nodes
 
-def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set):
+def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set):
 	likelihood_nodes = {}
 	instruments = "".join(sorted(instrument_set))
 	chunk_size = 16
 
 	bgbin_indices = sorted(lloid_output[None].keys())
 	for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices):
-		rankfile = functools.partial(get_rank_file, instruments, boundary_seg, '%04d'%n)
 		inputs = [o[0] for o in outputs]
 
 		# Break up the likelihood jobs into chunks to process fewer files, e.g, 16
@@ -1095,232 +1064,112 @@ def merge_in_bin_layer(dag, jobs, options):
 	rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
 	rankpdf_zerolag_nodes = []
 	outnodes = {}
-	if options.num_files_per_background_bin == 1:
-		bgbin_lloid_map = {}
-		# Get list of all files for each background bin (will be same length for each bin)
-		for ce in map(CacheEntry, open(options.lloid_cache)):
-			bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
-
-		if len(bgbin_lloid_map.values()[0]) == 1:
-			# Starting with 1:1 mapping between files and bins,
-			# thus no merging is needed yet
-			outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))]
-			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
-				outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)]
-
-		else:
-			for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
-				db = inputs_to_db(jobs, dbs)
-				sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file)
-				outnodes.setdefault(None, []).append(sqlitenode)
-
-			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
-				bgbin_lloid_map = {}
-				for ce in map(CacheEntry, open(inj_lloid_cache)):
-					bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
-
-				for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
-					injdb = inputs_to_db(jobs, dbs)
-					sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file)
-					outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
+	bgbin_lloid_map = {}
+	# Get list of all files for each background bin (will be same length for each bin)
+	for ce in map(CacheEntry, open(options.lloid_cache)):
+		bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
+
+	if len(bgbin_lloid_map.values()[0]) == 1:
+		# Starting with 1:1 mapping between files and bins,
+		# thus no merging is needed yet
+		outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))]
+		for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
+			outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)]
 
 	else:
-		# Starting with output of analysis before naming convention
-		# update (commit 5efd78fee6b371c999f510d07be33ec64f385695),
-		# so all lloid files contain gps times for entire run, and are
-		# numbered by iterating through segments in a given bin first
-		# (e.g. files 0000 to 0009 may all belong to bin 0000, then
-		# files 0010 to 0019 would all belong to bin 0001, etc)
-		for ce_list in dagparts.groups(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin):
-			noninjdb = cache_to_db(ce_list, jobs)
-			# merge all of the dbs from the same subbank
-			sqlitenode = merge_cluster_layer(dag, jobs, [], noninjdb, [ce.path for ce in ce_list], options)
+		for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
+			db = inputs_to_db(jobs, dbs)
+			sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file)
 			outnodes.setdefault(None, []).append(sqlitenode)
 
 		for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
-			for ce_list in dagparts.groups(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin):
-				injdb = cache_to_db(ce_list, jobs)
-				# merge all of the dbs from the same subbank
-				sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, [ce.path for ce in ce_list], options)
+			bgbin_lloid_map = {}
+			for ce in map(CacheEntry, open(inj_lloid_cache)):
+				bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
+
+			for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
+				injdb = inputs_to_db(jobs, dbs)
+				sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file)
 				outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
 
 	return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes
 
-def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments):
-	innodes = {}
-
-	# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
-	files_to_group = 40
-	for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()):
-		if inj is None:
-			innode_key = None
-			sql_file = options.cluster_sql_file
-		else:
-			innode_key = sim_tag_from_inj_file(inj)
-			sql_file = options.injection_sql_file
-
-		# Flatten the nodes for this sub bank
-		nodes = dagparts.flatten(nodes)
-		merge_nodes = []
-		# Flatten the input/output files from calc_likelihood
-		inputs = dagparts.flatten([node.input_files["input-cache"] for node in nodes])
-
-		# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
-		for input_files in dagparts.groups(inputs, files_to_group):
-			merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
-				opts = {"sql-file": sql_file, "tmp-space": dagparts.condor_scratch_space()},
-				input_files = {"": input_files}
-				)
-			)
-			if options.copy_raw_results:
-				merge_nodes[-1].set_pre_script("store_raw.sh")
-				merge_nodes[-1].add_pre_script_arg(" ".join(input_files))
-
-		# Merging all the dbs from the same sub bank
-		for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
-			db = inputs_to_db(jobs, inputs)
-			sqlitenode = merge_cluster_layer(dag, jobs, merge_nodes, db, inputs, sql_file)
-			innodes.setdefault(innode_key, []).append(sqlitenode)
-
-	# make sure outnodes has a None key, even if its value is an empty list
-	innodes.setdefault(None, [])
-
-	num_chunks = 50
-
-	if options.vetoes is None:
-		vetoes = []
-	else:
-		vetoes = [options.vetoes]
-
-	chunk_nodes = []
+def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg):
+	"""cluster and merge lloid output into a single database
+	"""
+	sqlite_nodes = {}
 	dbs_to_delete = []
-	# Process the chirp mass bins in chunks to paralellize the merging process
-	for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
-		try:
-			dbs = [node.input_files[""] for node in nodes]
-			parents = nodes
+	instruments = "".join(sorted(instrument_set))
+	chunk_size = 10
 
-		except AttributeError:
-			# analysis started at merger step but seeded by lloid files which
-			# have already been merged into one file per background
-			# bin, thus the analysis will begin at this point
-			dbs = nodes
-			parents = []
+	# define lloid files
+	bgbin_indices = sorted(lloid_output[None].keys())
+	output_lloid = [(file_, node) for key in bgbin_indices for file_, node in lloid_output[None][key]]
+
+	### cluster and merge
+	while len(output_lloid) > 1:
+		input_lloid = output_lloid
+		output_lloid = []
+		for lloid_chunk in dagparts.groups(input_lloid, chunk_size):
+			cache, nodes = zip(*lloid_chunk)
+			nodes = list(set(dagparts.flatten(nodes)))
+
+			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in cache]
+			noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['mergeAndReduce'].output_path)
+
+			# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
+			mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
+				parent_nodes = nodes,
+				opts = {"sql-file": options.cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
+				input_files = {},
+				input_cache_files = {"input-cache": cache},
+				output_files = {"database": noninjdb},
+			)
 
-		dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
-		noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
+			dbs_to_delete.append(noninjdb)
+			output_lloid.append((noninjdb, [mergenode]))
 
-		# Merge and cluster the final non injection database
-		noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file)
-		chunk_nodes.append(noninjsqlitenode)
-		dbs_to_delete.append(noninjdb)
+	sqlite_nodes[None] = [o[1] for o in output_lloid]
 
-	# Merge the final non injection database
-	outnodes = []
+	# then injections
 	injdbs = []
-	if options.non_injection_db: #### injection-only run
-		noninjdb = options.non_injection_db
-	else:
-		final_nodes = []
-		for chunk, nodes in enumerate(dagparts.groups(innodes[None], 10)):
-			noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
-
-			# cluster the final non injection database
-			noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file)
-			final_nodes.append(noninjsqlitenode)
-
-		input_files = (vetoes + [options.frame_segments_file])
-		input_cache_files = [node.input_files[""] for node in final_nodes]
-		noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
-		noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
-
-		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode],
-			input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
-		)
-
-		outnodes.append(cpnode)
-
-	# FIXME far-injections currently doesnt work, either fix it or delete it
-	#for injections, far_injections in zip(options.injections, options.far_injections):
-	if options.injections:
-		iterable_injections = options.injections
-	else:
-		iterable_injections = options.injections_for_merger
-
-	for injections in iterable_injections:
-		# extract only the nodes that were used for injections
-		chunk_nodes = []
-
-		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
-			try:
-				dbs = [injnode.input_files[""] for injnode in injnodes]
-				parents = injnodes
-			except AttributeError:
-				dbs = injnodes
-				parents = []
-
-			# Setup the final output names, etc.
-			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
-			injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
-
-			# merge and cluster
-			clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file)
-			chunk_nodes.append(clusternode)
-			dbs_to_delete.append(injdb)
-
-
-		final_nodes = []
-		for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
-			# Setup the final output names, etc.
-			injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
+	for inj in options.injections:
+		sim_tag = sim_tag_from_inj_file(inj)
+
+		# define lloid files
+		bgbin_indices = sorted(lloid_output[sim_tag].keys())
+		output_lloid = [(file_, node) for key in bgbin_indices for file_, node in lloid_output[sim_tag][key]]
+
+		### cluster and merge
+		while len(output_lloid) > 1:
+			input_lloid = output_lloid
+			output_lloid = []
+			for lloid_chunk in dagparts.groups(input_lloid, chunk_size):
+				cache, nodes = zip(*lloid_chunk)
+				nodes = list(set(dagparts.flatten(nodes)))
+
+				dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in cache]
+				injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['mergeAndReduce'].output_path)
+
+				# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
+				mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
+					parent_nodes = nodes,
+					opts = {"sql-file": options.injection_sql_file, "tmp-space": dagparts.condor_scratch_space()},
+					input_files = {},
+					input_cache_files = {"input-cache": cache},
+					output_files = {"database": injdb},
+				)
 
-			# merge and cluster
-			clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.cluster_sql_file)
-			final_nodes.append(clusternode)
+				dbs_to_delete.append(injdb)
+				output_lloid.append((injdb, [mergenode]))
 
-		# Setup the final output names, etc.
-		injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
+		sqlite_nodes[sim_tag] = [o[1] for o in output_lloid]
 		injdbs.append(injdb)
-		injxml = injdb.replace('.sqlite','.xml.gz')
-
-		# FIXME far-injections currently doesnt work, either fix it or delete it
-		## If there are injections that are too far away to be seen in a separate file, add them now.
-		#if far_injections is not None:
-		#	xml_input = [injxml] + [far_injections]
-		#else:
-		#	xml_input = injxml
-		xml_input = injxml
-
-		# merge and cluster
-		parent_nodes = final_nodes + ligolw_add_nodes
-		input_files = (vetoes + [options.frame_segments_file, injections])
-		input_cache_files = [node.input_files[""] for node in final_nodes]
-		clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
-
-		clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode],
-			opts = {"tmp-space":dagparts.condor_scratch_space()},
-			output_files = {"extract":injxml},
-			input_files = {"database":injdb}
-		)
-
-		inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode],
-			opts = {"time-window":0.9},
-			input_files = {"":injxml}
-		)
 
-		sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode],
-			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-			output_files = {"database":injdb},
-			input_files = {"":xml_input}
-		)
-
-		cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode],
-			input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
-		)
-
-		outnodes.append(cpnode)
+	# remove final databases from databases considered for deletion
+	dbs_to_delete = [db for db in dbs_to_delete if db not in injdbs and db != noninjdb]
 
-	return injdbs, noninjdb, outnodes, dbs_to_delete
+	return sqlite_nodes, injdbs, noninjdb, dbs_to_delete
 
 def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
 	ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes]
@@ -1370,19 +1219,20 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
 
 	return final_margnodes, dagparts.flatten(all_margcache)
 
-def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes):
+def compute_far_layer(dag, jobs, likelihood_nodes, final_margnodes, injdbs, noninjdb):
 	"""compute FAPs and FARs
 	"""
+	all_likelihood_nodes = dagparts.flatten(dagparts.flatten(likelihood_nodes.values()))
 	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
 	filesuffixs = ['', '_with_zerolag']
 
-	for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs):
+	for margnode, margfile, filesuffix in zip(final_margnodes, margfiles, filesuffixs):
 		if options.marginalized_likelihood_file: ### injection-only run
-			parents = final_sqlite_nodes
+			parents = all_likelihood_nodes
 			marginalized_likelihood_file = margfile
 
 		else:
-			parents = [margnode] + final_sqlite_nodes
+			parents = [margnode] + all_likelihood_nodes
 			marginalized_likelihood_file = margnode.output_files["output"]
 
 		farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents,
@@ -1514,7 +1364,7 @@ if __name__ == '__main__':
 
 	if not options.lloid_cache:
 		# Inspiral jobs by segment
-		inspiral_nodes, lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
+		lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
 
 		# marginalize jobs
 		marg_nodes = marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file)
@@ -1524,20 +1374,20 @@ if __name__ == '__main__':
 
 	else:
 		# Merge lloid files into 1 file per bin if not already 1 file per bin
-		rankpdf_nodes, rankpdf_zerolag_nodes, likelihood_nodes = merge_in_bin_layer(dag, jobs, options)
+		lloid_output = retrieve_lloid_output(dag, jobs, options)
+		rankpdf_nodes, rankpdf_zerolag_nodes, marg_nodes = merge_in_bin_layer(dag, jobs, options)
+
+	# Setup clustering and/or merging
+	sqlite_nodes, injdbs, noninjdb, dbs_to_delete = cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg)
 
 	# final marginalization step
 	final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes)
 
 	# likelihood jobs
-	likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set)
-
-	# Setup clustering and/or merging
-	# after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof)
-	injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments)
+	likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set)
 
 	# Compute FAR
-	farnode = compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes)
+	farnode = compute_far_layer(dag, jobs, likelihood_nodes, final_marg_nodes, injdbs, noninjdb)
 
 	# make summary plots
 	plotnodes = summary_plot_layer(dag, jobs, farnode, options)
-- 
GitLab