diff --git a/gstlal-inspiral/bin/gstlal_inspiral_dag b/gstlal-inspiral/bin/gstlal_inspiral_dag index 099ddd8bf5d55fb398fa3d4834f3d9cd0fbb1897..eed28595da17d75c577472cba42e9c5e136c61bf 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_dag +++ b/gstlal-inspiral/bin/gstlal_inspiral_dag @@ -280,7 +280,8 @@ def parse_command_line(): parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)") parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)") - # Data from a previous run in the case of a run that starts at the merger step + # Data from a previous run in the case of a run that starts at the post-processing step + parser.add_option("--postprocess-only", action = "store_true", default=False, help = "Start the analysis at the post-processing step.") parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)") parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)") parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)") @@ -399,6 +400,7 @@ def set_up_jobs(options): base_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}) ref_psd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"}) calc_rank_pdf_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"}) + calc_likelihood_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"24GB", "want_graceful_removal":"True", "kill_sig":"15"}) svd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"}) inj_snr_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"}) sh_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}) @@ -437,7 +439,7 @@ def set_up_jobs(options): jobs['createPriorDistStats'] = dagparts.DAGJob("gstlal_inspiral_create_prior_diststats", condor_commands = base_condor_commands) jobs['calcRankPDFs'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", condor_commands = calc_rank_pdf_condor_commands) jobs['calcRankPDFsWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", tag_base="gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands=calc_rank_pdf_condor_commands) - jobs['calcLikelihoodByBin'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", condor_commands = base_condor_commands) + jobs['calcLikelihoodByBin'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", condor_commands = calc_likelihood_condor_commands) jobs['marginalize'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands = base_condor_commands) jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands) @@ -447,7 +449,7 @@ def set_up_jobs(options): jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands) jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands) jobs['mergeAndReduce'] = dagparts.DAGJob("gstlal_inspiral_merge_and_reduce", condor_commands = base_condor_commands) - jobs['calcLikelihoodByBinInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", tag_base='gstlal_inspiral_calc_likelihood_by_bin_inj', condor_commands=base_condor_commands) + jobs['calcLikelihoodByBinInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood_by_bin", tag_base='gstlal_inspiral_calc_likelihood_by_bin_inj', condor_commands=calc_likelihood_condor_commands) jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands) jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands) jobs['toSqlite'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = base_condor_commands) @@ -898,18 +900,14 @@ def summary_plot_layer(dag, jobs, parent_nodes, options): return plotnodes -def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete): - """clean intermediate merger products +def clean_intermediate_products_layer(dag, jobs, nodes, files_to_delete): + """clean intermediate products """ - for db in dbs_to_delete: - dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes, - input_files = {"": db} + for file_ in files_to_delete: + dagparts.DAGNode(jobs['rm'], dag, parent_nodes = nodes, + input_files = {"": file_} ) - for margfile in margfiles_to_delete: - dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes, - input_files = {"": margfile} - ) return None def inj_psd_layer(segsdict, options): @@ -1012,7 +1010,7 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set): likelihood_nodes = {} instruments = "".join(sorted(instrument_set)) - chunk_size = 16 + chunk_size = 128 # convert final db to ligolw noninjxml = noninjdb.replace('.sqlite','.xml.gz') @@ -1023,13 +1021,14 @@ def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb ) parent_node = dbnode - for bgbin_index in sorted(lloid_output[None].keys()): + for bgbin_indices in dagparts.groups(sorted(lloid_output[None].keys()), chunk_size): parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag, - parent_nodes = [parent_node, marg_nodes[bgbin_index]], + parent_nodes = [parent_node] + [marg_nodes[idx] for idx in bgbin_indices], opts = {"tmp-space": dagparts.condor_scratch_space()}, input_files = { - "likelihood-url": marg_nodes[bgbin_index].output_files["output"], + "likelihood-url": [marg_nodes[idx].output_files["output"] for idx in bgbin_indices], "": dbnode.output_files["extract"], + "force": "", } ) likelihood_nodes[None] = [parent_node] @@ -1048,52 +1047,83 @@ def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb ) parent_node = injdbnode - for bgbin_index in sorted(lloid_nodes.keys()): + for bgbin_indices in dagparts.groups(sorted(lloid_nodes.keys()), chunk_size): parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag, - parent_nodes = [parent_node, marg_nodes[bgbin_index]], + parent_nodes = [parent_node] + [marg_nodes[idx] for idx in bgbin_indices], opts = {"tmp-space": dagparts.condor_scratch_space()}, input_files = { - "likelihood-url": marg_nodes[bgbin_index].output_files["output"], + "likelihood-url": [marg_nodes[idx].output_files["output"] for idx in bgbin_indices], "": injdbnode.output_files["extract"], + "force": "", } ) likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node] return likelihood_nodes -def merge_in_bin_layer(dag, jobs, options): - rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0])) - rankpdf_zerolag_nodes = [] - outnodes = {} - bgbin_lloid_map = {} +def likelihood_from_cache_layer(dag, jobs, options, boundary_seg, instrument_set): # Get list of all files for each background bin (will be same length for each bin) - for ce in map(CacheEntry, open(options.lloid_cache)): - bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) + bgbin_likelihood_map = {} + for ce in map(CacheEntry, open(options.likelihood_cache)): + bgbin_likelihood_map.setdefault(ce.description.split('_')[0], []).append(ce.path) - if len(bgbin_lloid_map.values()[0]) == 1: - # Starting with 1:1 mapping between files and bins, - # thus no merging is needed yet - outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))] - for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)] + bgbin_db_map = {} + for ce in map(CacheEntry, open(options.database_cache)): + bgbin_db_map.setdefault(ce.description.split('_')[0], []).append(ce.path) - else: - for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - db = inputs_to_db(jobs, dbs) - sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file) - outnodes.setdefault(None, []).append(sqlitenode) + likelihood_nodes = {} + instruments = "".join(sorted(instrument_set)) + chunk_size = 128 + + # convert noninjdb to ligolw + noninjxml = noninjdb.replace('.sqlite','.xml.gz') + dbnode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [], + opts = {"tmp-space": dagparts.condor_scratch_space()}, + output_files = {"extract": noninjxml}, + input_files = {"database": noninjdb} + ) + + # first, non-injection jobs + parent_node = dbnode + for likelihood_urls in dagparts.groups(sorted(bgbin_likelihood_map[None].values()), chunk_size): + parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBin'], dag, + parent_nodes = [parent_node], + opts = {"tmp-space": dagparts.condor_scratch_space()}, + input_files = { + "likelihood-url": likelihood_urls, + "": dbnode.output_files["extract"], + "force": "", + } + ) + likelihood_nodes[None] = [parent_node] + + # then injections + for inj, injdb in zip(options.injections, injdbs): + all_likelihood_urls = bgbin_likelihood_map[sim_tag_from_inj_file(inj)].values() - for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - bgbin_lloid_map = {} - for ce in map(CacheEntry, open(inj_lloid_cache)): - bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) + # convert final db to ligolw + injxml = injdb.replace('.sqlite','.xml.gz') + injdbnode = dagparts.DAGNode(jobs['toXML'], dag, + parent_nodes = final_sqlite_nodes[sim_tag_from_inj_file(inj)], + opts = {"tmp-space": dagparts.condor_scratch_space()}, + output_files = {"extract": injxml}, + input_files = {"database": injdb} + ) - for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - injdb = inputs_to_db(jobs, dbs) - sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file) - outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode) + parent_node = injdbnode + for likelihood_urls in dagparts.groups(sorted(all_likelihood_urls), chunk_size): + parent_node = dagparts.DAGNode(jobs['calcLikelihoodByBinInj'], dag, + parent_nodes = [parent_node], + opts = {"tmp-space": dagparts.condor_scratch_space()}, + input_files = { + "likelihood-url": likelihood_urls, + "": injdbnode.output_files["extract"], + "force": "", + } + ) + likelihood_nodes[sim_tag_from_inj_file(inj)] = [parent_node] - return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes + return likelihood_nodes def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg): """cluster and merge lloid output into a single database @@ -1101,7 +1131,7 @@ def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, bound sqlite_nodes = {} dbs_to_delete = [] instruments = "".join(sorted(instrument_set)) - chunk_size = 10 + chunk_size = 20 # define lloid files bgbin_indices = sorted(lloid_output[None].keys()) @@ -1186,7 +1216,7 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes): try: margin = [node.output_files["output"] for node in nodes] parents = nodes - except AttributeError: ### analysis started at merger step + except AttributeError: ### analysis started at post-processing step margin = nodes parents = [] @@ -1329,8 +1359,8 @@ if __name__ == '__main__': ### reference psd jobs psd_nodes, ref_psd_parent_nodes = inj_psd_layer(segsdict, options) - elif options.lloid_cache: - # starting analysis at merger step, nothing to do here + elif options.postprocess_only: + # postprocess-only analysis, nothing to do here pass elif options.reference_psd is None: @@ -1351,7 +1381,7 @@ if __name__ == '__main__': ref_psd_parent_nodes = [] # Calculate Expected SNR jobs - if not options.lloid_cache and not options.disable_calc_inj_snr: + if not options.postprocess_only and not options.disable_calc_inj_snr: ligolw_add_nodes = expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100) else: ligolw_add_nodes = [] @@ -1363,7 +1393,7 @@ if __name__ == '__main__': # mass model model_node, model_file = mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd) - if not options.lloid_cache: + if not options.postprocess_only: # Inspiral jobs by segment lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict) @@ -1373,20 +1403,22 @@ if __name__ == '__main__': # calc rank PDF jobs rankpdf_nodes, rankpdf_zerolag_nodes = calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set) + # Setup clustering and/or merging + sqlite_nodes, injdbs, noninjdb, dbs_to_delete = cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg) + + # likelihood jobs + likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set) else: - # Merge lloid files into 1 file per bin if not already 1 file per bin - lloid_output = retrieve_lloid_output(dag, jobs, options) - rankpdf_nodes, rankpdf_zerolag_nodes, marg_nodes = merge_in_bin_layer(dag, jobs, options) + # load rankpdf cache + rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0])) + rankpdf_zerolag_nodes = [] - # Setup clustering and/or merging - sqlite_nodes, injdbs, noninjdb, dbs_to_delete = cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg) + # likelihood jobs + likelihood_nodes = likelihood_from_cache_layer(dag, jobs, options, boundary_seg, instrument_set) # final marginalization step final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes) - # likelihood jobs - likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set) - # Compute FAR farnode = compute_far_layer(dag, jobs, likelihood_nodes, final_marg_nodes, injdbs, noninjdb) @@ -1396,8 +1428,10 @@ if __name__ == '__main__': # make a web page summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs) - # rm intermediate merger products - clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete) + # rm intermediate margfiles/sqlite files + clean_intermediate_products_layer(dag, jobs, plotnodes, margfiles_to_delete) + clean_intermediate_products_layer(dag, jobs, plotnodes, dbs_to_delete) + # # generate DAG files