Commit f3ee4be4 authored by Patrick Godwin's avatar Patrick Godwin

gstlal_inspiral_dag: DAG reorganization to allow post-processing of triggers

parent 07c7658d
......@@ -362,6 +362,8 @@ def parse_command_line():
#FIXME a hack to find the sql paths
share_path = os.path.split(dagparts.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal')
options.snr_cluster_sql_file = os.path.join(share_path, 'snr_simplify_and_cluster.sql')
options.inj_snr_cluster_sql_file = os.path.join(share_path, 'inj_snr_simplify_and_cluster.sql')
options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql')
options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql')
options.injection_proc_sql_file = os.path.join(share_path, 'simplify_proc_table_in_inj_file.sql')
......@@ -421,7 +423,7 @@ def set_up_jobs(options):
jobs['createPriorDistStats'] = None
jobs['calcRankPDFs'] = None
jobs['calcRankPDFsWithZerolag'] = None
jobs['calcLikelihood'] = None
jobs['calcLikelihoodByBin'] = None
jobs['marginalize'] = None
jobs['marginalizeWithZerolag'] = None
......@@ -443,7 +445,7 @@ def set_up_jobs(options):
jobs['createPriorDistStats'] = dagparts.DAGJob("gstlal_inspiral_create_prior_diststats", condor_commands = base_condor_commands)
jobs['calcRankPDFs'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", condor_commands = calc_rank_pdf_condor_commands)
jobs['calcRankPDFsWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", tag_base="gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands=calc_rank_pdf_condor_commands)
jobs['calcLikelihood'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", condor_commands = base_condor_commands)
jobs['calcLikelihoodByBin'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", condor_commands = base_condor_commands)
jobs['marginalize'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands = base_condor_commands)
jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands)
......@@ -453,7 +455,7 @@ def set_up_jobs(options):
jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands)
jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands)
jobs['mergeAndReduce'] = dagparts.DAGJob("gstlal_inspiral_merge_and_reduce", condor_commands = base_condor_commands)
jobs['calcLikelihoodInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands=base_condor_commands)
jobs['calcLikelihoodByBinInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", tag_base='gstlal_inspiral_calc_likelihood_by_bin_inj', condor_commands=base_condor_commands)
jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands)
jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands)
jobs['toSqlite'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = base_condor_commands)
......@@ -1020,43 +1022,50 @@ def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb
instruments = "".join(sorted(instrument_set))
chunk_size = 16
bgbin_indices = sorted(lloid_output[None].keys())
for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices):
inputs = [o[0] for o in outputs]
# convert final db to ligolw
noninjxml = noninjdb.replace('.sqlite','.xml.gz')
dbnode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = final_sqlite_nodes[None],
opts = {"tmp-space": dagparts.condor_scratch_space()},
output_files = {"extract": noninjxml},
input_files = {"database": noninjdb}
)
# Break up the likelihood jobs into chunks to process fewer files, e.g, 16
likelihood_nodes.setdefault(None,[]).append(
[dagparts.DAGNode(jobs['calcLikelihood'], dag,
parent_nodes = [marg_nodes[bgbin_index]],
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {"likelihood-url": marg_nodes[bgbin_index].output_files["output"]},
input_cache_files = {"input-cache": chunked_inputs}
) for chunked_inputs in dagparts.groups(inputs, chunk_size)]
)
parent_node = dbnode
for bgbin_index in sorted(lloid_output[None].keys()):
parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag,
parent_nodes = [parent_node, marg_nodes[bgbin_index]],
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {
"likelihood-url": marg_nodes[bgbin_index].output_files["output"],
"": dbnode.output_files["extract"],
}
)
likelihood_nodes[None] = [parent_node]
# then injections
for inj in options.injections:
for inj, injdb in zip(options.injections, injdbs):
lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)]
bgbin_indices = sorted(lloid_nodes.keys())
for n, (outputs, diststats, bgbin_index) in enumerate((lloid_nodes[key], lloid_diststats[key], key) for key in bgbin_indices):
if outputs is not None:
inputs = [o[0] for o in outputs]
parents = dagparts.flatten([o[1] for o in outputs])
if marg_nodes[bgbin_index]:
parents.append(marg_nodes[bgbin_index])
likelihood_url = marg_nodes[bgbin_index].output_files["output"]
else:
likelihood_url = diststats[0]
# Break up the likelihood jobs into chunks to process fewer files, e.g., 16
likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append(
[dagparts.DAGNode(jobs['calcLikelihoodInj'], dag,
parent_nodes = parents,
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {"likelihood-url": likelihood_url},
input_cache_files = {"input-cache": chunked_inputs}
) for chunked_inputs in dagparts.groups(inputs, chunk_size)]
)
# convert final db to ligolw
injxml = injdb.replace('.sqlite','.xml.gz')
injdbnode = dagparts.DAGNode(jobs['toXML'], dag,
parent_nodes = final_sqlite_nodes[sim_tag_from_inj_file(inj)],
opts = {"tmp-space": dagparts.condor_scratch_space()},
output_files = {"extract": injxml},
input_files = {"database": injdb}
)
parent_node = injdbnode
for bgbin_index in sorted(lloid_nodes.keys()):
parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag,
parent_nodes = [parent_node, marg_nodes[bgbin_index]],
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {
"likelihood-url": marg_nodes[bgbin_index].output_files["output"],
"": injdbnode.output_files["extract"],
}
)
likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node]
return likelihood_nodes
......@@ -1120,7 +1129,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
parent_nodes = nodes,
opts = {"sql-file": options.cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
opts = {"sql-file": options.snr_cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
input_files = {},
input_cache_files = {"input-cache": cache},
output_files = {"database": noninjdb},
......@@ -1129,7 +1138,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
dbs_to_delete.append(noninjdb)
output_lloid.append((noninjdb, [mergenode]))
sqlite_nodes[None] = [o[1] for o in output_lloid]
sqlite_nodes[None] = dagparts.flatten([o[1] for o in output_lloid])
# then injections
injdbs = []
......@@ -1154,7 +1163,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
# Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10
mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag,
parent_nodes = nodes,
opts = {"sql-file": options.injection_sql_file, "tmp-space": dagparts.condor_scratch_space()},
opts = {"sql-file": options.inj_snr_cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
input_files = {},
input_cache_files = {"input-cache": cache},
output_files = {"database": injdb},
......@@ -1163,7 +1172,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
dbs_to_delete.append(injdb)
output_lloid.append((injdb, [mergenode]))
sqlite_nodes[sim_tag] = [o[1] for o in output_lloid]
sqlite_nodes[sim_tag] = dagparts.flatten([o[1] for o in output_lloid])
injdbs.append(injdb)
# remove final databases from databases considered for deletion
......@@ -1222,7 +1231,7 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
def compute_far_layer(dag, jobs, likelihood_nodes, final_margnodes, injdbs, noninjdb):
"""compute FAPs and FARs
"""
all_likelihood_nodes = dagparts.flatten(dagparts.flatten(likelihood_nodes.values()))
all_likelihood_nodes = dagparts.flatten(likelihood_nodes.values())
margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
filesuffixs = ['', '_with_zerolag']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment