From f3ee4be4b79b7d72b62660fb80f59ff516b43c0e Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Thu, 6 Jun 2019 07:28:26 -0700
Subject: [PATCH] gstlal_inspiral_dag: DAG reorganization to allow
 post-processing of triggers

---
 gstlal-inspiral/bin/gstlal_inspiral_dag | 91 ++++++++++++++-----------
 1 file changed, 50 insertions(+), 41 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_inspiral_dag b/gstlal-inspiral/bin/gstlal_inspiral_dag
index 7c0be6d1ea..23b505c082 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral_dag
+++ b/gstlal-inspiral/bin/gstlal_inspiral_dag
@@ -362,6 +362,8 @@ def parse_command_line():
 
 	#FIXME a hack to find the sql paths
 	share_path = os.path.split(dagparts.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal')
+	options.snr_cluster_sql_file = os.path.join(share_path, 'snr_simplify_and_cluster.sql')
+	options.inj_snr_cluster_sql_file = os.path.join(share_path, 'inj_snr_simplify_and_cluster.sql')
 	options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql')
 	options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql')
 	options.injection_proc_sql_file = os.path.join(share_path, 'simplify_proc_table_in_inj_file.sql')
@@ -421,7 +423,7 @@ def set_up_jobs(options):
 		jobs['createPriorDistStats'] = None
 		jobs['calcRankPDFs'] = None
 		jobs['calcRankPDFsWithZerolag'] = None
-		jobs['calcLikelihood'] = None
+		jobs['calcLikelihoodByBin'] = None
 		jobs['marginalize'] = None
 		jobs['marginalizeWithZerolag'] = None
 
@@ -443,7 +445,7 @@ def set_up_jobs(options):
 		jobs['createPriorDistStats'] = dagparts.DAGJob("gstlal_inspiral_create_prior_diststats", condor_commands = base_condor_commands)
 		jobs['calcRankPDFs'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", condor_commands = calc_rank_pdf_condor_commands)
 		jobs['calcRankPDFsWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", tag_base="gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands=calc_rank_pdf_condor_commands)
-		jobs['calcLikelihood'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", condor_commands = base_condor_commands)
+		jobs['calcLikelihoodByBin'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", condor_commands = base_condor_commands)
 		jobs['marginalize'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands = base_condor_commands)
 		jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands)
 
@@ -453,7 +455,7 @@ def set_up_jobs(options):
 	jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands)
 	jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands)
 	jobs['mergeAndReduce'] = dagparts.DAGJob("gstlal_inspiral_merge_and_reduce", condor_commands = base_condor_commands)
-	jobs['calcLikelihoodInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands=base_condor_commands)
+	jobs['calcLikelihoodByBinInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", tag_base='gstlal_inspiral_calc_likelihood_by_bin_inj', condor_commands=base_condor_commands)
 	jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands)
 	jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands)
 	jobs['toSqlite'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = base_condor_commands)
@@ -1020,43 +1022,50 @@ def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb
 	instruments = "".join(sorted(instrument_set))
 	chunk_size = 16
 
-	bgbin_indices = sorted(lloid_output[None].keys())
-	for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices):
-		inputs = [o[0] for o in outputs]
+	# convert final db to ligolw
+	noninjxml = noninjdb.replace('.sqlite','.xml.gz')
+	dbnode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = final_sqlite_nodes[None],
+		opts = {"tmp-space": dagparts.condor_scratch_space()},
+		output_files = {"extract": noninjxml},
+		input_files = {"database": noninjdb}
+	)
 
-		# Break up the likelihood jobs into chunks to process fewer files, e.g, 16
-		likelihood_nodes.setdefault(None,[]).append(
-			[dagparts.DAGNode(jobs['calcLikelihood'], dag,
-				parent_nodes = [marg_nodes[bgbin_index]],
-				opts = {"tmp-space": dagparts.condor_scratch_space()},
-				input_files = {"likelihood-url": marg_nodes[bgbin_index].output_files["output"]},
-				input_cache_files = {"input-cache": chunked_inputs}
-				) for chunked_inputs in dagparts.groups(inputs, chunk_size)]
-			)
+	parent_node = dbnode
+	for bgbin_index in sorted(lloid_output[None].keys()):
+		parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag,
+			parent_nodes = [parent_node, marg_nodes[bgbin_index]],
+			opts = {"tmp-space": dagparts.condor_scratch_space()},
+			input_files = {
+				"likelihood-url": marg_nodes[bgbin_index].output_files["output"],
+				"": dbnode.output_files["extract"],
+			}
+		)
+	likelihood_nodes[None] = [parent_node]
 
 	# then injections
-	for inj in options.injections:
+	for inj, injdb in zip(options.injections, injdbs):
 		lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)]
-		bgbin_indices = sorted(lloid_nodes.keys())
-		for n, (outputs, diststats, bgbin_index) in enumerate((lloid_nodes[key], lloid_diststats[key], key) for key in bgbin_indices):
-			if outputs is not None:
-				inputs = [o[0] for o in outputs]
-				parents = dagparts.flatten([o[1] for o in outputs])
-				if marg_nodes[bgbin_index]:
-					parents.append(marg_nodes[bgbin_index])
-					likelihood_url = marg_nodes[bgbin_index].output_files["output"]
-				else:
-					likelihood_url = diststats[0]
-
-				# Break up the likelihood jobs into chunks to process fewer files, e.g., 16
-				likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append(
-					[dagparts.DAGNode(jobs['calcLikelihoodInj'], dag,
-						parent_nodes = parents,
-						opts = {"tmp-space": dagparts.condor_scratch_space()},
-						input_files = {"likelihood-url": likelihood_url},
-						input_cache_files = {"input-cache": chunked_inputs}
-						) for chunked_inputs in dagparts.groups(inputs, chunk_size)]
-					)
+
+		# convert final db to ligolw
+		injxml = injdb.replace('.sqlite','.xml.gz')
+		injdbnode = dagparts.DAGNode(jobs['toXML'], dag,
+			parent_nodes = final_sqlite_nodes[sim_tag_from_inj_file(inj)],
+			opts = {"tmp-space": dagparts.condor_scratch_space()},
+			output_files = {"extract": injxml},
+			input_files = {"database": injdb}
+		)
+
+		parent_node = injdbnode
+		for bgbin_index in sorted(lloid_nodes.keys()):
+			parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag,
+				parent_nodes = [parent_node, marg_nodes[bgbin_index]],
+				opts = {"tmp-space": dagparts.condor_scratch_space()},
+				input_files = {
+					"likelihood-url": marg_nodes[bgbin_index].output_files["output"],
+					"": injdbnode.output_files["extract"],
+				}
+			)
+		likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node]
 
 	return likelihood_nodes
 
@@ -1120,7 +1129,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
 			# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
 			mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
 				parent_nodes = nodes,
-				opts = {"sql-file": options.cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
+				opts = {"sql-file": options.snr_cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
 				input_files = {},
 				input_cache_files = {"input-cache": cache},
 				output_files = {"database": noninjdb},
@@ -1129,7 +1138,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
 			dbs_to_delete.append(noninjdb)
 			output_lloid.append((noninjdb, [mergenode]))
 
-	sqlite_nodes[None] = [o[1] for o in output_lloid]
+	sqlite_nodes[None] = dagparts.flatten([o[1] for o in output_lloid])
 
 	# then injections
 	injdbs = []
@@ -1154,7 +1163,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
 				# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
 				mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
 					parent_nodes = nodes,
-					opts = {"sql-file": options.injection_sql_file, "tmp-space": dagparts.condor_scratch_space()},
+					opts = {"sql-file": options.inj_snr_cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
 					input_files = {},
 					input_cache_files = {"input-cache": cache},
 					output_files = {"database": injdb},
@@ -1163,7 +1172,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
 				dbs_to_delete.append(injdb)
 				output_lloid.append((injdb, [mergenode]))
 
-		sqlite_nodes[sim_tag] = [o[1] for o in output_lloid]
+		sqlite_nodes[sim_tag] = dagparts.flatten([o[1] for o in output_lloid])
 		injdbs.append(injdb)
 
 	# remove final databases from databases considered for deletion
@@ -1222,7 +1231,7 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
 def compute_far_layer(dag, jobs, likelihood_nodes, final_margnodes, injdbs, noninjdb):
 	"""compute FAPs and FARs
 	"""
-	all_likelihood_nodes = dagparts.flatten(dagparts.flatten(likelihood_nodes.values()))
+	all_likelihood_nodes = dagparts.flatten(likelihood_nodes.values())
 	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
 	filesuffixs = ['', '_with_zerolag']
 
-- 
GitLab