Commit 5dc20a30 authored by Patrick Godwin's avatar Patrick Godwin

gstlal_inspiral_dag: rework analysis if starting at post-processing step, add...

gstlal_inspiral_dag: rework analysis if starting at post-processing step, add parallelization to calc_likelihood_by_bin jobs, remove unused func
parent dbe9015a
Pipeline #67027 passed with stages
in 34 minutes and 38 seconds
......@@ -280,7 +280,8 @@ def parse_command_line():
parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)")
parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)")
# Data from a previous run in the case of a run that starts at the merger step
# Data from a previous run in the case of a run that starts at the post-processing step
parser.add_option("--postprocess-only", action = "store_true", default=False, help = "Start the analysis at the post-processing step.")
parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
......@@ -399,6 +400,7 @@ def set_up_jobs(options):
base_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})
ref_psd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})
calc_rank_pdf_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"})
calc_likelihood_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"24GB", "want_graceful_removal":"True", "kill_sig":"15"})
svd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"})
inj_snr_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})
sh_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})
......@@ -437,7 +439,7 @@ def set_up_jobs(options):
jobs['createPriorDistStats'] = dagparts.DAGJob("gstlal_inspiral_create_prior_diststats", condor_commands = base_condor_commands)
jobs['calcRankPDFs'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", condor_commands = calc_rank_pdf_condor_commands)
jobs['calcRankPDFsWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", tag_base="gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands=calc_rank_pdf_condor_commands)
jobs['calcLikelihoodByBin'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", condor_commands = base_condor_commands)
jobs['calcLikelihoodByBin'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", condor_commands = calc_likelihood_condor_commands)
jobs['marginalize'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands = base_condor_commands)
jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands)
......@@ -447,7 +449,7 @@ def set_up_jobs(options):
jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands)
jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands)
jobs['mergeAndReduce'] = dagparts.DAGJob("gstlal_inspiral_merge_and_reduce", condor_commands = base_condor_commands)
jobs['calcLikelihoodByBinInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", tag_base='gstlal_inspiral_calc_likelihood_by_bin_inj', condor_commands=base_condor_commands)
jobs['calcLikelihoodByBinInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", tag_base='gstlal_inspiral_calc_likelihood_by_bin_inj', condor_commands=calc_likelihood_condor_commands)
jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands)
jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands)
jobs['toSqlite'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = base_condor_commands)
......@@ -898,18 +900,14 @@ def summary_plot_layer(dag, jobs, parent_nodes, options):
return plotnodes
def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete):
"""clean intermediate merger products
def clean_intermediate_products_layer(dag, jobs, nodes, files_to_delete):
"""clean intermediate products
"""
for db in dbs_to_delete:
dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
input_files = {"": db}
for file_ in files_to_delete:
dagparts.DAGNode(jobs['rm'], dag, parent_nodes = nodes,
input_files = {"": file_}
)
for margfile in margfiles_to_delete:
dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
input_files = {"": margfile}
)
return None
def inj_psd_layer(segsdict, options):
......@@ -1012,7 +1010,7 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument
def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set):
likelihood_nodes = {}
instruments = "".join(sorted(instrument_set))
chunk_size = 16
chunk_size = 128
# convert final db to ligolw
noninjxml = noninjdb.replace('.sqlite','.xml.gz')
......@@ -1023,13 +1021,14 @@ def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb
)
parent_node = dbnode
for bgbin_index in sorted(lloid_output[None].keys()):
for bgbin_indices in dagparts.groups(sorted(lloid_output[None].keys()), chunk_size):
parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag,
parent_nodes = [parent_node, marg_nodes[bgbin_index]],
parent_nodes = [parent_node] + [marg_nodes[idx] for idx in bgbin_indices],
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {
"likelihood-url": marg_nodes[bgbin_index].output_files["output"],
"likelihood-url": [marg_nodes[idx].output_files["output"] for idx in bgbin_indices],
"": dbnode.output_files["extract"],
"force": "",
}
)
likelihood_nodes[None] = [parent_node]
......@@ -1048,52 +1047,83 @@ def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb
)
parent_node = injdbnode
for bgbin_index in sorted(lloid_nodes.keys()):
for bgbin_indices in dagparts.groups(sorted(lloid_nodes.keys()), chunk_size):
parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag,
parent_nodes = [parent_node, marg_nodes[bgbin_index]],
parent_nodes = [parent_node] + [marg_nodes[idx] for idx in bgbin_indices],
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {
"likelihood-url": marg_nodes[bgbin_index].output_files["output"],
"likelihood-url": [marg_nodes[idx].output_files["output"] for idx in bgbin_indices],
"": injdbnode.output_files["extract"],
"force": "",
}
)
likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node]
return likelihood_nodes
def merge_in_bin_layer(dag, jobs, options):
rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
rankpdf_zerolag_nodes = []
outnodes = {}
bgbin_lloid_map = {}
def likelihood_from_cache_layer(dag, jobs, options, boundary_seg, instrument_set):
# Get list of all files for each background bin (will be same length for each bin)
for ce in map(CacheEntry, open(options.lloid_cache)):
bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
bgbin_likelihood_map = {}
for ce in map(CacheEntry, open(options.likelihood_cache)):
bgbin_likelihood_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
if len(bgbin_lloid_map.values()[0]) == 1:
# Starting with 1:1 mapping between files and bins,
# thus no merging is needed yet
outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))]
for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)]
bgbin_db_map = {}
for ce in map(CacheEntry, open(options.database_cache)):
bgbin_db_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
else:
for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
db = inputs_to_db(jobs, dbs)
sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file)
outnodes.setdefault(None, []).append(sqlitenode)
likelihood_nodes = {}
instruments = "".join(sorted(instrument_set))
chunk_size = 128
# convert noninjdb to ligolw
noninjxml = noninjdb.replace('.sqlite','.xml.gz')
dbnode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [],
opts = {"tmp-space": dagparts.condor_scratch_space()},
output_files = {"extract": noninjxml},
input_files = {"database": noninjdb}
)
# first, non-injection jobs
parent_node = dbnode
for likelihood_urls in dagparts.groups(sorted(bgbin_likelihood_map[None].values()), chunk_size):
parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag,
parent_nodes = [parent_node],
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {
"likelihood-url": likelihood_urls,
"": dbnode.output_files["extract"],
"force": "",
}
)
likelihood_nodes[None] = [parent_node]
# then injections
for inj, injdb in zip(options.injections, injdbs):
all_likelihood_urls = bgbin_likelihood_map[sim_tag_from_inj_file(inj)].values()
for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
bgbin_lloid_map = {}
for ce in map(CacheEntry, open(inj_lloid_cache)):
bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
# convert final db to ligolw
injxml = injdb.replace('.sqlite','.xml.gz')
injdbnode = dagparts.DAGNode(jobs['toXML'], dag,
parent_nodes = final_sqlite_nodes[sim_tag_from_inj_file(inj)],
opts = {"tmp-space": dagparts.condor_scratch_space()},
output_files = {"extract": injxml},
input_files = {"database": injdb}
)
for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
injdb = inputs_to_db(jobs, dbs)
sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file)
outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
parent_node = injdbnode
for likelihood_urls in dagparts.groups(sorted(all_likelihood_urls), chunk_size):
parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag,
parent_nodes = [parent_node],
opts = {"tmp-space": dagparts.condor_scratch_space()},
input_files = {
"likelihood-url": likelihood_urls,
"": injdbnode.output_files["extract"],
"force": "",
}
)
likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node]
return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes
return likelihood_nodes
def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg):
"""cluster and merge lloid output into a single database
......@@ -1101,7 +1131,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound
sqlite_nodes = {}
dbs_to_delete = []
instruments = "".join(sorted(instrument_set))
chunk_size = 10
chunk_size = 20
# define lloid files
bgbin_indices = sorted(lloid_output[None].keys())
......@@ -1186,7 +1216,7 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes):
try:
margin = [node.output_files["output"] for node in nodes]
parents = nodes
except AttributeError: ### analysis started at merger step
except AttributeError: ### analysis started at post-processing step
margin = nodes
parents = []
......@@ -1329,8 +1359,8 @@ if __name__ == '__main__':
### reference psd jobs
psd_nodes, ref_psd_parent_nodes = inj_psd_layer(segsdict, options)
elif options.lloid_cache:
# starting analysis at merger step, nothing to do here
elif options.postprocess_only:
# postprocess-only analysis, nothing to do here
pass
elif options.reference_psd is None:
......@@ -1351,7 +1381,7 @@ if __name__ == '__main__':
ref_psd_parent_nodes = []
# Calculate Expected SNR jobs
if not options.lloid_cache and not options.disable_calc_inj_snr:
if not options.postprocess_only and not options.disable_calc_inj_snr:
ligolw_add_nodes = expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100)
else:
ligolw_add_nodes = []
......@@ -1363,7 +1393,7 @@ if __name__ == '__main__':
# mass model
model_node, model_file = mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd)
if not options.lloid_cache:
if not options.postprocess_only:
# Inspiral jobs by segment
lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
......@@ -1373,20 +1403,22 @@ if __name__ == '__main__':
# calc rank PDF jobs
rankpdf_nodes, rankpdf_zerolag_nodes = calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
# Setup clustering and/or merging
sqlite_nodes, injdbs, noninjdb, dbs_to_delete = cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg)
# likelihood jobs
likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set)
else:
# Merge lloid files into 1 file per bin if not already 1 file per bin
lloid_output = retrieve_lloid_output(dag, jobs, options)
rankpdf_nodes, rankpdf_zerolag_nodes, marg_nodes = merge_in_bin_layer(dag, jobs, options)
# load rankpdf cache
rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
rankpdf_zerolag_nodes = []
# Setup clustering and/or merging
sqlite_nodes, injdbs, noninjdb, dbs_to_delete = cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg)
# likelihood jobs
likelihood_nodes = likelihood_from_cache_layer(dag, jobs, options, boundary_seg, instrument_set)
# final marginalization step
final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes)
# likelihood jobs
likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set)
# Compute FAR
farnode = compute_far_layer(dag, jobs, likelihood_nodes, final_marg_nodes, injdbs, noninjdb)
......@@ -1396,8 +1428,10 @@ if __name__ == '__main__':
# make a web page
summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs)
# rm intermediate merger products
clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete)
# rm intermediate margfiles/sqlite files
clean_intermediate_products_layer(dag, jobs, plotnodes, margfiles_to_delete)
clean_intermediate_products_layer(dag, jobs, plotnodes, dbs_to_delete)
#
# generate DAG files
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment