Commit 83ede53b authored by Patrick Godwin's avatar Patrick Godwin

gstlal_inspiral_pipe, inspiral_pipe.py: fixes to allow generation of injection-only DAGs

parent 19eb3c44
Pipeline #78340 passed with stages
in 24 minutes and 23 seconds
......@@ -128,6 +128,7 @@ def parse_command_line():
parser.add_option("--injections", action = "append", help = "append injection files to analyze. Must prepend filename with X:Y:, where X and Y are floats, e.g. 1.2:3.1:filename, so that the injections are only searched for in regions of the template bank with X <= chirp mass < Y.")
# Data from a zero lag run in the case of an injection-only run.
parser.add_option("--injection-only", default=False, action = "store_true", help = "Run an injection only analysis.")
parser.add_option("--dist-stats-cache", metavar = "filename", help = "Set the cache file for dist stats (required iff running injection-only analysis)")
parser.add_option("--svd-bank-cache", metavar = "filename", help = "Set the cache file for svd banks (required iff running injection-only analysis)")
parser.add_option("--psd-cache", metavar = "filename", help = "Set the cache file for psd (required iff running injection-only analysis)")
......@@ -135,13 +136,6 @@ def parse_command_line():
parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)")
parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)")
# Data from a previous run in the case of a run that starts at the merger step
parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions")
parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")
# Condor commands
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
parser.add_option("--max-inspiral-jobs", type="int", metavar = "jobs", help = "Set the maximum number of gstlal_inspiral jobs to run simultaneously, default no constraint.")
......@@ -191,21 +185,10 @@ def parse_command_line():
if len(missing_injection_options) == 0 and options.num_banks:
raise ValueError("cant specify --num-banks in injection-only run")
missing_merger_options = []
for option in ("lloid_cache", "inj_lloid_cache", "rank_pdf_cache"):
if getattr(options, option) is None:
missing_merger_options.append(option)
if len(missing_injection_options) > 0 and len(missing_injection_options) < 3:
raise ValueError("missing merger-run options %s." % ", ".join([option for option in missing_injection_options]))
if len(missing_injection_options) == 0 and options.num_banks:
raise ValueError("cant specify --num-banks in a run starting at the merger step")
fail = ""
required_options = []
if len(missing_merger_options) == 3:
required_options.append("reference_likelihood_file")
if len(missing_injection_options) == 6:
required_options.append("bank_cache")
if len(missing_injection_options) == 6:
required_options.append("bank_cache")
for option in required_options:
if getattr(options, option) is None:
......@@ -275,8 +258,7 @@ def set_up_jobs(options):
inspiral_2ifo_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inspiral_2ifo_condor_opts)
inspiral_3ifo_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inspiral_3ifo_condor_opts)
if options.dist_stats_cache:
# injection-only run
if options.injection_only:
jobs['gstlalInspiral1IFO'] = None
jobs['gstlalInspiral2IFO'] = None
jobs['gstlalInspiral3IFO'] = None
......@@ -287,13 +269,7 @@ def set_up_jobs(options):
jobs['marginalize'] = None
jobs['marginalizeWithZerolag'] = None
elif options.lloid_cache:
# analysis starting at merger step
jobs['marginalize'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands=base_condor_commands)
jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands)
else:
# set up jobs only needed for zerolag run
jobs['refPSD'] = dagparts.DAGJob("gstlal_reference_psd", condor_commands = ref_psd_condor_commands)
jobs['medianPSD'] = dagparts.DAGJob("gstlal_median_of_psds", condor_commands = base_condor_commands)
jobs['plotBanks'] = dagparts.DAGJob("gstlal_inspiral_plot_banks", condor_commands = base_condor_commands)
......@@ -380,7 +356,7 @@ if __name__ == '__main__':
template_mchirp_dict, bank_cache, max_time = inspiral_pipe.get_bank_params(options)
instrument_set = bank_cache.keys()
elif options.psd_cache:
if options.psd_cache:
template_mchirp_dict, svd_nodes, max_time = inspiral_pipe.get_svd_bank_params(options.svd_bank_cache)
instrument_set = svd_nodes.keys()
......@@ -390,10 +366,7 @@ if __name__ == '__main__':
if options.psd_cache:
### reference psd jobs
psd_nodes, ref_psd_parent_nodes = inspiral_pipe.inj_psd_layer(segsdict, options)
elif options.lloid_cache:
# starting analysis at merger step, nothing to do here
pass
ref_psd = inspiral_pipe.load_reference_psd(options)
elif options.reference_psd is None:
# Compute the PSDs for each segment
......@@ -413,12 +386,12 @@ if __name__ == '__main__':
ref_psd_parent_nodes = []
# Calculate Expected SNR jobs
if not options.lloid_cache and not options.disable_calc_inj_snr:
if not options.disable_calc_inj_snr:
ligolw_add_nodes = inspiral_pipe.expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100)
else:
ligolw_add_nodes = []
if options.bank_cache:
if not options.injection_only:
# Compute SVD banks
svd_nodes, template_mchirp_dict, svd_dtdphi_map = inspiral_pipe.svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, output_dir, template_mchirp_dict)
......@@ -427,16 +400,16 @@ if __name__ == '__main__':
else:
model_node = None
model_file = options.mass_model_file
svd_dtdphi_map, _ = inspiral_pipe.load_svd_dtdphi_map(options)
if not options.lloid_cache:
# Inspiral jobs by segment
inspiral_nodes, lloid_output, lloid_diststats = inspiral_pipe.inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
# Inspiral jobs by segment
inspiral_nodes, lloid_output, lloid_diststats = inspiral_pipe.inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
# marginalize jobs
marg_nodes = inspiral_pipe.marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map)
# marginalize jobs
marg_nodes = inspiral_pipe.marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map)
# calc rank PDF jobs
rankpdf_nodes, rankpdf_zerolag_nodes = inspiral_pipe.calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
# calc rank PDF jobs
rankpdf_nodes, rankpdf_zerolag_nodes = inspiral_pipe.calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
# final marginalization step
final_marg_nodes, margfiles_to_delete = inspiral_pipe.final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, options)
......
......@@ -711,8 +711,12 @@ def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, optio
inputs = [o[0] for o in outputs]
parents = dagparts.flatten([o[1] for o in outputs])
parents.append(marg_nodes[bin_key])
likelihood_url = marg_nodes[bin_key].output_files["output"]
if bin_key in marg_nodes:
parents.append(marg_nodes[bin_key])
likelihood_url = marg_nodes[bin_key].output_files["output"]
else:
likelihood_url = lloid_diststats[bin_key][0]
likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents)
return likelihood_nodes
......@@ -728,6 +732,7 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o
xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz")
snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file
cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file
likelihood_job = jobs['calcLikelihood'] if sim_tag is None else jobs['calcLikelihoodInj']
# cluster sub banks
cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents,
......@@ -748,9 +753,9 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o
)
# assign likelihoods
likelihood_node = dagparts.DAGNode(jobs['calcLikelihood'], dag,
likelihood_node = dagparts.DAGNode(likelihood_job, dag,
parent_nodes = [cluster_node],
opts = {"tmp-space":dagparts.condor_scratch_space()},
opts = {"tmp-space": dagparts.condor_scratch_space(), "force": ""},
input_files = {"likelihood-url":likelihood_url, "": xml}
)
......@@ -952,12 +957,14 @@ def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes
"""
margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
filesuffixs = ['', '_with_zerolag']
if options.marginalized_likelihood_file: ### injection-only run
assert not margnodes, "no marg nodes should be produced in an injection-only DAG"
margnodes = [None, None]
for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs):
if options.marginalized_likelihood_file: ### injection-only run
parents = final_sqlite_nodes
marginalized_likelihood_file = margfile
else:
parents = [margnode] + final_sqlite_nodes
marginalized_likelihood_file = margnode.output_files["output"]
......@@ -1052,6 +1059,16 @@ def load_analysis_output(options):
lloid_diststats.setdefault(ce.description.split("_")[0], []).append(ce.path)
# load svd dtdphi map
svd_dtdphi_map, instrument_set = load_svd_dtdphi_map(options)
# modify injections option, as is done in 'adapt_inspiral_output'
# FIXME: don't do this, find a cleaner way of handling this generally
options.injections = [inj.split(':')[-1] for inj in options.injections]
return bgbin_lloid_map, lloid_diststats, svd_dtdphi_map, instrument_set
def load_svd_dtdphi_map(options):
svd_dtdphi_map = {}
bank_cache = load_bank_cache(options)
instrument_set = bank_cache.keys()
......@@ -1061,11 +1078,7 @@ def load_analysis_output(options):
for i, individual_svd_cache in enumerate(ce.path for ce in map(CacheEntry, open(svd_caches))):
svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j]
# modify injections option, as is done in 'adapt_inspiral_output'
# FIXME: don't do this, find a cleaner way of handling this generally
options.injections = [inj.split(':')[-1] for inj in options.injections]
return bgbin_lloid_map, lloid_diststats, svd_dtdphi_map, instrument_set
return svd_dtdphi_map, instrument_set
def get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment