From 83ede53b6722b68db7440b09c6a114600708f095 Mon Sep 17 00:00:00 2001
From: "patrick.godwin" <patrick.godwin@ligo.org>
Date: Wed, 4 Sep 2019 13:06:19 -0400
Subject: [PATCH] gstlal_inspiral_pipe, inspiral_pipe.py: fixes to allow
 generation of injection-only DAGs

---
 gstlal-inspiral/bin/gstlal_inspiral_pipe | 57 +++++++-----------------
 gstlal-inspiral/python/inspiral_pipe.py  | 33 +++++++++-----
 2 files changed, 38 insertions(+), 52 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_inspiral_pipe b/gstlal-inspiral/bin/gstlal_inspiral_pipe
index cbf2b0258e..b403448d90 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_inspiral_pipe
@@ -128,6 +128,7 @@ def parse_command_line():
 	parser.add_option("--injections", action = "append", help = "append injection files to analyze. Must prepend filename with X:Y:, where X and Y are floats, e.g. 1.2:3.1:filename, so that the injections are only searched for in regions of the template bank with X <= chirp mass < Y.")
 
 	# Data from a zero lag run in the case of an injection-only run.
+	parser.add_option("--injection-only", default=False, action = "store_true", help = "Run an injection only analysis.")
 	parser.add_option("--dist-stats-cache", metavar = "filename", help = "Set the cache file for dist stats (required iff running injection-only analysis)")
 	parser.add_option("--svd-bank-cache", metavar = "filename", help = "Set the cache file for svd banks (required iff running injection-only analysis)")
 	parser.add_option("--psd-cache", metavar = "filename", help = "Set the cache file for psd (required iff running injection-only analysis)")
@@ -135,13 +136,6 @@ def parse_command_line():
 	parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)")
 	parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)")
 
-	# Data from a previous run in the case of a run that starts at the merger step
-	parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
-	parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
-	parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
-	parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions")
-	parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")
-
 	# Condor commands
 	parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
 	parser.add_option("--max-inspiral-jobs", type="int", metavar = "jobs", help = "Set the maximum number of gstlal_inspiral jobs to run simultaneously, default no constraint.")
@@ -191,21 +185,10 @@ def parse_command_line():
 	if len(missing_injection_options) == 0 and options.num_banks:
 		raise ValueError("cant specify --num-banks in injection-only run")
 
-	missing_merger_options = []
-	for option in ("lloid_cache", "inj_lloid_cache", "rank_pdf_cache"):
-		if getattr(options, option) is None:
-			missing_merger_options.append(option)
-	if len(missing_injection_options) > 0 and len(missing_injection_options) < 3:
-		raise ValueError("missing merger-run options %s." % ", ".join([option for option in missing_injection_options]))
-	if len(missing_injection_options) == 0 and options.num_banks:
-		raise ValueError("cant specify --num-banks in a run starting at the merger step")
-
 	fail = ""
 	required_options = []
-	if len(missing_merger_options) == 3:
-		required_options.append("reference_likelihood_file")
-		if len(missing_injection_options) == 6:
-			required_options.append("bank_cache")
+	if len(missing_injection_options) == 6:
+		required_options.append("bank_cache")
 
 	for option in required_options:
 		if getattr(options, option) is None:
@@ -275,8 +258,7 @@ def set_up_jobs(options):
 	inspiral_2ifo_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inspiral_2ifo_condor_opts)
 	inspiral_3ifo_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inspiral_3ifo_condor_opts)
 
-	if options.dist_stats_cache:
-		# injection-only run
+	if options.injection_only:
 		jobs['gstlalInspiral1IFO'] = None
 		jobs['gstlalInspiral2IFO'] = None
 		jobs['gstlalInspiral3IFO'] = None
@@ -287,13 +269,7 @@ def set_up_jobs(options):
 		jobs['marginalize'] = None
 		jobs['marginalizeWithZerolag'] = None
 
-	elif options.lloid_cache:
-		# analysis starting at merger step
-		jobs['marginalize'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands=base_condor_commands)
-		jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands)
-
 	else:
-		# set up jobs only needed for zerolag run
 		jobs['refPSD'] = dagparts.DAGJob("gstlal_reference_psd", condor_commands = ref_psd_condor_commands)
 		jobs['medianPSD'] = dagparts.DAGJob("gstlal_median_of_psds", condor_commands = base_condor_commands)
 		jobs['plotBanks'] = dagparts.DAGJob("gstlal_inspiral_plot_banks", condor_commands = base_condor_commands)
@@ -380,7 +356,7 @@ if __name__ == '__main__':
 			template_mchirp_dict, bank_cache, max_time = inspiral_pipe.get_bank_params(options)
 			instrument_set = bank_cache.keys()
 
-		elif options.psd_cache:
+		if options.psd_cache:
 			template_mchirp_dict, svd_nodes, max_time = inspiral_pipe.get_svd_bank_params(options.svd_bank_cache)
 			instrument_set = svd_nodes.keys()
 
@@ -390,10 +366,7 @@ if __name__ == '__main__':
 	if options.psd_cache:
 		### reference psd jobs
 		psd_nodes, ref_psd_parent_nodes = inspiral_pipe.inj_psd_layer(segsdict, options)
-
-	elif options.lloid_cache:
-		# starting analysis at merger step, nothing to do here
-		pass
+		ref_psd = inspiral_pipe.load_reference_psd(options)
 
 	elif options.reference_psd is None:
 		# Compute the PSDs for each segment
@@ -413,12 +386,12 @@ if __name__ == '__main__':
 		ref_psd_parent_nodes = []
 
 	# Calculate Expected SNR jobs
-	if not options.lloid_cache and not options.disable_calc_inj_snr:
+	if not options.disable_calc_inj_snr:
 		ligolw_add_nodes = inspiral_pipe.expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100)
 	else:
 		ligolw_add_nodes = []
 
-	if options.bank_cache:
+	if not options.injection_only:
 		# Compute SVD banks
 		svd_nodes, template_mchirp_dict, svd_dtdphi_map = inspiral_pipe.svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, output_dir, template_mchirp_dict)
 
@@ -427,16 +400,16 @@ if __name__ == '__main__':
 	else:
 		model_node = None
 		model_file = options.mass_model_file
+		svd_dtdphi_map, _ = inspiral_pipe.load_svd_dtdphi_map(options)
 
-	if not options.lloid_cache:
-		# Inspiral jobs by segment
-		inspiral_nodes, lloid_output, lloid_diststats = inspiral_pipe.inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
+	# Inspiral jobs by segment
+	inspiral_nodes, lloid_output, lloid_diststats = inspiral_pipe.inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
 
-		# marginalize jobs
-		marg_nodes = inspiral_pipe.marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map)
+	# marginalize jobs
+	marg_nodes = inspiral_pipe.marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map)
 
-		# calc rank PDF jobs
-		rankpdf_nodes, rankpdf_zerolag_nodes = inspiral_pipe.calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
+	# calc rank PDF jobs
+	rankpdf_nodes, rankpdf_zerolag_nodes = inspiral_pipe.calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
 
 	# final marginalization step
 	final_marg_nodes, margfiles_to_delete = inspiral_pipe.final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, options)
diff --git a/gstlal-inspiral/python/inspiral_pipe.py b/gstlal-inspiral/python/inspiral_pipe.py
index 86ca87322b..3c084776aa 100644
--- a/gstlal-inspiral/python/inspiral_pipe.py
+++ b/gstlal-inspiral/python/inspiral_pipe.py
@@ -711,8 +711,12 @@ def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, optio
 				inputs = [o[0] for o in outputs]
 				parents = dagparts.flatten([o[1] for o in outputs])
 
-				parents.append(marg_nodes[bin_key])
-				likelihood_url = marg_nodes[bin_key].output_files["output"]
+				if bin_key in marg_nodes:
+					parents.append(marg_nodes[bin_key])
+					likelihood_url = marg_nodes[bin_key].output_files["output"]
+				else:
+					likelihood_url = lloid_diststats[bin_key][0]
+
 				likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents)
 
 	return likelihood_nodes
@@ -728,6 +732,7 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o
 		xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz")
 		snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file
 		cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file
+		likelihood_job = jobs['calcLikelihood'] if sim_tag is None else jobs['calcLikelihoodInj']
 
 		# cluster sub banks
 		cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents,
@@ -748,9 +753,9 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o
 			)
 
 		# assign likelihoods
-		likelihood_node = dagparts.DAGNode(jobs['calcLikelihood'], dag,
+		likelihood_node = dagparts.DAGNode(likelihood_job, dag,
 			parent_nodes = [cluster_node],
-			opts = {"tmp-space":dagparts.condor_scratch_space()},
+			opts = {"tmp-space": dagparts.condor_scratch_space(), "force": ""},
 			input_files = {"likelihood-url":likelihood_url, "": xml}
 			)
 
@@ -952,12 +957,14 @@ def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes
 	"""
 	margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
 	filesuffixs = ['', '_with_zerolag']
+	if options.marginalized_likelihood_file: ### injection-only run
+		assert not margnodes, "no marg nodes should be produced in an injection-only DAG"
+		margnodes = [None, None]
 
 	for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs):
 		if options.marginalized_likelihood_file: ### injection-only run
 			parents = final_sqlite_nodes
 			marginalized_likelihood_file = margfile
-
 		else:
 			parents = [margnode] + final_sqlite_nodes
 			marginalized_likelihood_file = margnode.output_files["output"]
@@ -1052,6 +1059,16 @@ def load_analysis_output(options):
 			lloid_diststats.setdefault(ce.description.split("_")[0], []).append(ce.path)
 
 	# load svd dtdphi map
+	svd_dtdphi_map, instrument_set = load_svd_dtdphi_map(options)
+
+	# modify injections option, as is done in 'adapt_inspiral_output'
+	# FIXME: don't do this, find a cleaner way of handling this generally
+	options.injections = [inj.split(':')[-1] for inj in options.injections]
+
+	return bgbin_lloid_map, lloid_diststats, svd_dtdphi_map, instrument_set
+
+
+def load_svd_dtdphi_map(options):
 	svd_dtdphi_map = {}
 	bank_cache = load_bank_cache(options)
 	instrument_set = bank_cache.keys()
@@ -1061,11 +1078,7 @@ def load_analysis_output(options):
 			for i, individual_svd_cache in enumerate(ce.path for ce in map(CacheEntry, open(svd_caches))):
 				svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j]
 
-	# modify injections option, as is done in 'adapt_inspiral_output'
-	# FIXME: don't do this, find a cleaner way of handling this generally
-	options.injections = [inj.split(':')[-1] for inj in options.injections]
-
-	return bgbin_lloid_map, lloid_diststats, svd_dtdphi_map, instrument_set
+	return svd_dtdphi_map, instrument_set
 
 
 def get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options):
-- 
GitLab