From 3c6c1b250cef6cb617a1ea842e2672d0e57e73f0 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Sat, 11 May 2019 16:28:54 -0700
Subject: [PATCH] gstlal_inspiral_pipe: factor out merge/cluster step into
 merge_cluster_layer()

---
 gstlal-inspiral/bin/gstlal_inspiral_pipe | 233 +++++++----------------
 1 file changed, 64 insertions(+), 169 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_inspiral_pipe b/gstlal-inspiral/bin/gstlal_inspiral_pipe
index 7f6e56b55d..cd8ade8b60 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_inspiral_pipe
@@ -195,6 +195,20 @@ def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
 
 	return lloid_output, lloid_diststats
 
+def set_up_scripts(options):
+	# Make an xml integrity checker
+	if options.gzip_test:
+		with open("gzip_test.sh", "w") as f:
+			f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
+		os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
+
+	# A pre script to backup data before feeding to lossy programs
+	# (e.g. clustering routines)
+	with open("store_raw.sh", "w") as f:
+		f.write("""#!/bin/bash
+		for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
+	os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
+
 
 #----------------------------------------------------------
 ### command line options
@@ -352,7 +366,7 @@ def parse_command_line():
 
 
 #----------------------------------------------------------
-### DAG setup utilities
+### DAG utilities
 
 def set_up_jobs(options):
 	jobs = {}
@@ -427,21 +441,6 @@ def set_up_jobs(options):
 
 	return jobs
 
-def set_up_scripts(options):
-	# Make an xml integrity checker
-	if options.gzip_test:
-		with open("gzip_test.sh", "w") as f:
-			f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
-		os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
-
-	# A pre script to backup data before feeding to lossy programs
-	# (e.g. clustering routines)
-	with open("store_raw.sh", "w") as f:
-		f.write("""#!/bin/bash
-		for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
-	os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
-
-
 #----------------------------------------------------------
 ### DAG layers
 
@@ -943,6 +942,20 @@ def model_layer(dag, jobs, parent_nodes, instruments, options, seg, template_ban
 	else:
 		return [], options.mass_model_file
 
+def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, options):
+	# Merge database into chunks
+	sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
+		opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
+		input_cache_files = {"input-cache": db_cache},
+		output_files = {"database":db},
+		input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
+	)
+
+	# cluster database
+	return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
+		opts = {"sql-file": options.cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
+		input_files = {"": db}
+	)
 
 def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, mass_model_add_node, mass_model_file):
 	likelihood_nodes = {}
@@ -1041,62 +1054,27 @@ def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, llo
 		merge_nodes = []
 		# Flatten the input/output files from calc_likelihood
 		inputs = flatten([node.input_files["input-cache"] for node in nodes])
-		if inj is None:
-			# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
-			for n in range(0, len(inputs), files_to_group):
-				merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
-					opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-					input_files = {"":inputs[n:n+files_to_group]}
-					)
-				)
-				if options.copy_raw_results:
-					merge_nodes[-1].set_pre_script("store_raw.sh")
-					merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
 
-			# Merging all the dbs from the same sub bank
-			for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
-				dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
-				db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
-				db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
-				sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = merge_nodes,
-					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-					input_cache_files = {"input-cache":inputs},
-					output_files = {"database":db},
-					input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
-				)
-				sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-					opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-					input_files = {"":db}
-				)
-				outnodes.setdefault(None, []).append(sqlitenode)
-		else:
-			# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
-			for n in range(0, len(inputs), files_to_group):
-				merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
-					opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-					input_files = {"":inputs[n:n+files_to_group]}
-					)
-				)
-				if options.copy_raw_results:
-					merge_nodes[-1].set_pre_script("store_raw.sh")
-					merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
-
-			# Merging all the dbs from the same sub bank and injection run
-			for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
-				injdb_files = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
-				injdb = dagparts.group_T050017_filename_from_T050017_files(injdb_files, '.sqlite')
-				injdb = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(injdb).description[:4]]), injdb)
-				sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = merge_nodes,
-					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-					input_cache_files = {"input-cache":inputs},
-					output_files = {"database":injdb},
-					input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
-				)
-				sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-					opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-					input_files = {"":injdb}
+		# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
+		for n in range(0, len(inputs), files_to_group):
+			merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
+				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
+				input_files = {"":inputs[n:n+files_to_group]}
 				)
-				outnodes.setdefault(sim_tag_from_inj_file(inj), []).append(sqlitenode)
+			)
+			if options.copy_raw_results:
+				merge_nodes[-1].set_pre_script("store_raw.sh")
+				merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
+
+		# Merging all the dbs from the same sub bank
+		outnode_key = sim_tag_from_inj_file(inj) if inj is not None else None
+		for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
+			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
+			db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
+			db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
+
+			sqlitenode = merge_cluster_layer(dag, jobs, merge_nodes, db, inputs, options)
+			outnodes.setdefault(outnode_key, []).append(sqlitenode)
 
 	# make sure outnodes has a None key, even if its value is an empty list
 	outnodes.setdefault(None, [])
@@ -1123,21 +1101,12 @@ def merge_in_bin_layer(dag, jobs, options):
 		else:
 			for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
 				dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
-				noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
-				# merge all of the dbs from the same subbank
-				sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
-					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-					input_cache_files = {"input-cache":dbs},
-					output_files = {"database":noninjdb},
-					input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
-				)
-
-				sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-					opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-					input_files = {"":noninjdb}
-				)
+				db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
+				db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
 
+				sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options)
 				outnodes.setdefault(None, []).append(sqlitenode)
+
 			for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
 				bgbin_lloid_map = {}
 				for ce in map(CacheEntry, open(inj_lloid_cache)):
@@ -1145,20 +1114,10 @@ def merge_in_bin_layer(dag, jobs, options):
 
 				for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
 					dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
-					injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
-					# merge all of the dbs from the same subbank
-					sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
-						opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-						input_cache_files = {"input-cache":dbs},
-						output_files = {"database":injdb},
-						input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
-					)
-
-					sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-						opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-						input_files = {"":injdb}
-					)
+					db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
+					db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
 
+					sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options)
 					outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
 
 	else:
@@ -1173,18 +1132,7 @@ def merge_in_bin_layer(dag, jobs, options):
 			noninjdb = os.path.join(jobs['toSqlite'].output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
 
 			# merge all of the dbs from the same subbank
-			sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
-				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-				input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
-				output_files = {"database":noninjdb},
-				input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
-			)
-
-			sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-				input_files = {"":noninjdb}
-			)
-
+			sqlitenode = merge_cluster_layer(dag, jobs, [], noninjdb, [ce.path for ce in ce_list], options)
 			outnodes.setdefault(None, []).append(sqlitenode)
 
 		for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
@@ -1193,18 +1141,7 @@ def merge_in_bin_layer(dag, jobs, options):
 				injdb = os.path.join(jobs['toSqlite'].output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
 
 				# merge all of the dbs from the same subbank
-				sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
-					opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-					input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
-					output_files = {"database":injdb},
-					input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
-				)
-
-				sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-					opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-					input_files = {"":injdb}
-				)
-
+				sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, [ce.path for ce in ce_list], options)
 				outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
 
 	return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes
@@ -1232,21 +1169,11 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
 			dbs = nodes
 			parents = []
 
-		# Merge the final non injection database into chunks
 		dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
 		noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
-		sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parents,
-			opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-			input_cache_files = {"input-cache": dbs},
-			output_files = {"database":noninjdb},
-			input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
-		)
 
-		# cluster the final non injection database
-		noninjsqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-			opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-			input_files = {"":noninjdb}
-		)
+		# Merge and cluster the final non injection database
+		noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options)
 		chunk_nodes.append(noninjsqlitenode)
 		dbs_to_delete.append(noninjdb)
 
@@ -1261,18 +1188,9 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
 		final_nodes = []
 		for chunk, nodes in enumerate(chunks(innodes[None], 10)):
 			noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
-			sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = nodes,
-				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-				input_cache_files = {"input-cache": [node.input_files[""] for node in nodes]},
-				output_files = {"database":noninjdb},
-				input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
-			)
 
 			# cluster the final non injection database
-			noninjsqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-				input_files = {"":noninjdb}
-			)
+			noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options)
 			final_nodes.append(noninjsqlitenode)
 
 		noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
@@ -1319,20 +1237,8 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
 			dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
 			injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
 
-			# merge
-			sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parents,
-				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-				input_cache_files = {"input-cache":dbs},
-				output_files = {"database":injdb},
-				input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
-			)
-
-			# cluster
-			clusternode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-				input_files = {"":injdb}
-			)
-
+			# merge and cluster
+			clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options)
 			chunk_nodes.append(clusternode)
 			dbs_to_delete.append(injdb)
 
@@ -1342,19 +1248,8 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
 			# Setup the final output names, etc.
 			injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
 
-			# merge
-			sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = injnodes,
-				opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
-				input_cache_files = {"input-cache": [node.input_files[""] for node in injnodes]},
-				output_files = {"database":injdb},
-				input_cache_file_name = injdb.replace('.sqlite','.cache')
-			)
-
-			# cluster
-			clusternode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
-				opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
-				input_files = {"":injdb}
-			)
+			# merge and cluster
+			clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options)
 			final_nodes.append(clusternode)
 
 		# Setup the final output names, etc.
-- 
GitLab