diff --git a/gstlal-inspiral/bin/gstlal_inspiral_dag b/gstlal-inspiral/bin/gstlal_inspiral_dag index b45aeb79d5ea3d4903317a997ef9c2dbc28a8e0c..d14bd938d9e0fe0b084214f0661fb77aace1ea8c 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_dag +++ b/gstlal-inspiral/bin/gstlal_inspiral_dag @@ -267,7 +267,6 @@ def parse_command_line(): parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp") parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. Separate injection runs using the --injections option will still occur.") parser.add_option("--singles-threshold", default=float("inf"), action = "store", metavar="THRESH", help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).") - parser.add_option("--copy-raw-results", default=False, action = "store_true", help = "Copy raw gstlal_inspiral results before applying clustering and other lossy operations.") parser.add_option("--gzip-test", default=False, action = "store_true", help = "Perform gzip --test on all output files.") parser.add_option("--verbose", action = "store_true", help = "Be verbose") parser.add_option("--disable-calc-inj-snr", default=False, action = "store_true", help = "Disable injection SNR calculation") @@ -367,6 +366,8 @@ def parse_command_line(): #FIXME a hack to find the sql paths share_path = os.path.split(dagparts.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal') options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql') + options.snr_cluster_sql_file = os.path.join(share_path, 'snr_simplify_and_cluster.sql') + options.injection_snr_cluster_sql_file = os.path.join(share_path, 'inj_snr_simplify_and_cluster.sql') options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql') options.injection_proc_sql_file = os.path.join(share_path, 'simplify_proc_table_in_inj_file.sql') @@ -391,10 +392,10 @@ def get_threshold_values(bgbin_indices, svd_bank_strings, options): else: return None -def inputs_to_db(jobs, inputs): +def inputs_to_db(jobs, inputs, job_type = 'toSqlite'): dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs] db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') - return os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db) + return os.path.join(subdir_path([jobs[job_type].output_path, CacheEntry.from_T050017(db).description[:4]]), db) def cache_to_db(cache, jobs): hi_index = cache[-1].description.split('_')[0] @@ -466,7 +467,7 @@ def set_up_jobs(options): jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands) jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands) jobs['toSqlite'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = base_condor_commands) - jobs['toSqliteNoCache'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml_inj_final", condor_commands = base_condor_commands) + jobs['toSqliteNoCache'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml_final", condor_commands = base_condor_commands) jobs['toXML'] = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_to_xml", condor_commands = base_condor_commands) jobs['lalappsRunSqlite'] = dagparts.DAGJob("lalapps_run_sqlite", condor_commands = base_condor_commands) jobs['plotSummary'] = dagparts.DAGJob("gstlal_inspiral_plotsummary", condor_commands = base_condor_commands) @@ -564,7 +565,7 @@ def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_m ids.append("%d_%d" % (i+bin_offset, n)) if f in template_mchirp_dict: mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1])) - svd_dtdphi_map[i+bin_offset] = options.dtdphi_file[j] + svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j] svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path) if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0): @@ -841,7 +842,7 @@ def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_s ) return ligolw_add_nodes -def summary_plot_layer(dag, jobs, parent_nodes, options): +def summary_plot_layer(dag, jobs, parent_nodes, options, injdbs, noninjdb): plotnodes = [] ### common plot options @@ -903,9 +904,9 @@ def summary_plot_layer(dag, jobs, parent_nodes, options): input_files = {"zero-lag-database": noninjdb, "": injdbs} )) - opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]} - opts.update(sensitivity_opts) for injdb in injdbs: + opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]} + opts.update(sensitivity_opts) plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode], opts = opts, input_files = {"zero-lag-database": noninjdb, "": injdb} @@ -991,32 +992,40 @@ def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, optio instruments = "".join(sorted(instrument_set)) margnodes = {} - # NOTE! we rely on there being identical templates in each instrument, so we just take one of the values of the svd_nodes which are a dictionary - one_ifo_svd_nodes = svd_nodes.values()[0] + # NOTE! we rely on there being identical templates in each instrument, + # so we just take one of the values of the svd_nodes which are a dictionary + # FIXME, the svd nodes list has to be the same as the sorted keys of + # lloid_output. svd nodes should be made into a dictionary much + # earlier in the code to prevent a mishap + one_ifo_svd_nodes = dict(("%04d" % n, node) for n, node in enumerate( svd_nodes.values()[0])) + # Here n counts the bins + # FIXME - this is broken for injection dags right now because of marg nodes # first non-injections, which will get skipped if this is an injections-only run - bgbin_indices = sorted(lloid_output[None].keys()) - for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices): + for bin_key in sorted(lloid_output[None].keys()): + outputs = lloid_output[None][bin_key] + diststats = lloid_diststats[bin_key] inputs = [o[0] for o in outputs] parents = dagparts.flatten([o[1] for o in outputs]) - rankfile = functools.partial(get_rank_file, instruments, boundary_seg, '%04d'%n) + rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key) # FIXME we keep this here in case we someday want to have a # mass bin dependent prior, but it really doesn't matter for # the time being. priornode = dagparts.DAGNode(jobs['createPriorDistStats'], dag, - parent_nodes = [one_ifo_svd_nodes[n]] + model_node or [], + parent_nodes = [one_ifo_svd_nodes[bin_key]] + model_node or [], opts = { "instrument": instrument_set, "background-prior": 1, "min-instruments": options.min_instruments, "coincidence-threshold":options.coincidence_threshold, + "synthesize-numerator": "", "df": "bandwidth" }, input_files = { - "svd-file": one_ifo_svd_nodes[n].output_files["write-svd"], - "mass-model-file": mass_model_file, - "dtdphi-file": svd_dtdphi_map[n], + "svd-file": one_ifo_svd_nodes[bin_key].output_files["write-svd"], + "mass-model-file": model_file, + "dtdphi-file": svd_dtdphi_map[bin_key], "psd-xml": ref_psd }, output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])} @@ -1026,13 +1035,13 @@ def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, optio # be needed to compute the likelihood diststats_per_bin_node = dagparts.DAGNode(jobs['marginalize'], dag, parent_nodes = [priornode] + parents, - opts = {"marginalize":"ranking-stat"}, + opts = {"marginalize": "ranking-stat"}, input_cache_files = {"likelihood-cache": diststats + [priornode.output_files["write-likelihood"]]}, output_files = {"output": rankfile('MARG_DIST_STATS', job=jobs['marginalize'])}, input_cache_file_name = rankfile('MARG_DIST_STATS') ) - margnodes[bgbin_index] = diststats_per_bin_node + margnodes[bin_key] = diststats_per_bin_node return margnodes @@ -1042,20 +1051,20 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument instruments = "".join(sorted(instrument_set)) # Here n counts the bins - for n, bgbin_index in enumerate(sorted(marg_nodes.keys())): - rankfile = functools.partial(get_rank_file, instruments, boundary_seg, '%04d'%n) + for bin_key in sorted(marg_nodes.keys()): + rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key) calcranknode = dagparts.DAGNode(jobs['calcRankPDFs'], dag, - parent_nodes = [marg_nodes[bgbin_index]], + parent_nodes = [marg_nodes[bin_key]], opts = {"ranking-stat-samples":options.ranking_stat_samples}, - input_files = {"": marg_nodes[bgbin_index].output_files["output"]}, + input_files = {"": marg_nodes[bin_key].output_files["output"]}, output_files = {"output": rankfile('CALC_RANK_PDFS', job=jobs['calcRankPDFs'])}, ) calcrankzerolagnode = dagparts.DAGNode(jobs['calcRankPDFsWithZerolag'], dag, - parent_nodes = [marg_nodes[bgbin_index]], + parent_nodes = [marg_nodes[bin_key]], opts = {"add-zerolag-to-background": "", "ranking-stat-samples": options.ranking_stat_samples}, - input_files = {"": marg_nodes[bgbin_index].output_files["output"]}, + input_files = {"": marg_nodes[bin_key].output_files["output"]}, output_files = {"output": rankfile('CALC_RANK_PDFS_WZL', job=jobs['calcRankPDFsWithZerolag'])}, ) @@ -1067,145 +1076,84 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set): likelihood_nodes = {} instruments = "".join(sorted(instrument_set)) - chunk_size = 100 - bgbin_indices = sorted(lloid_output[None].keys()) - for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices): - rankfile = functools.partial(get_rank_file, instruments, boundary_seg, '%04d'%n) + # non-injection jobs + for bin_key in sorted(lloid_output[None].keys()): + outputs = lloid_output[None][bin_key] + diststats = lloid_diststats[bin_key] inputs = [o[0] for o in outputs] - # Break up the likelihood jobs into chunks to process fewer files, e.g, 16 - likelihood_nodes.setdefault(None,[]).append( - [dagparts.DAGNode(jobs['calcLikelihood'], dag, - parent_nodes = [marg_nodes[bgbin_index]], - opts = {"tmp-space": dagparts.condor_scratch_space()}, - input_files = {"likelihood-url": marg_nodes[bgbin_index].output_files["output"]}, - input_cache_files = {"input-cache": chunked_inputs} - ) for chunked_inputs in dagparts.groups(inputs, chunk_size)] - ) + # (input files for next job, dist stat files, parents for next job) + likelihood_nodes[None, bin_key] = (inputs, marg_nodes[bin_key].output_files["output"], [marg_nodes[bin_key]]) - # then injections + # injection jobs for inj in options.injections: lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)] - bgbin_indices = sorted(lloid_nodes.keys()) - for n, (outputs, diststats, bgbin_index) in enumerate((lloid_nodes[key], lloid_diststats[key], key) for key in bgbin_indices): + for bin_key in sorted(lloid_nodes.keys()): + outputs = lloid_nodes[bin_key] + diststats = lloid_diststats[bin_key] if outputs is not None: inputs = [o[0] for o in outputs] parents = dagparts.flatten([o[1] for o in outputs]) - if marg_nodes[bgbin_index]: - parents.append(marg_nodes[bgbin_index]) - likelihood_url = marg_nodes[bgbin_index].output_files["output"] - else: - likelihood_url = diststats[0] - - # Break up the likelihood jobs into chunks to process fewer files, e.g., 16 - likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append( - [dagparts.DAGNode(jobs['calcLikelihoodInj'], dag, - parent_nodes = parents, - opts = {"tmp-space": dagparts.condor_scratch_space()}, - input_files = {"likelihood-url": likelihood_url}, - input_cache_files = {"input-cache": chunked_inputs} - ) for chunked_inputs in dagparts.groups(inputs, chunk_size)] - ) - - return likelihood_nodes - -def merge_in_bin_layer(dag, jobs, options): - rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0])) - rankpdf_zerolag_nodes = [] - outnodes = {} - if options.num_files_per_background_bin == 1: - bgbin_lloid_map = {} - # Get list of all files for each background bin (will be same length for each bin) - for ce in map(CacheEntry, open(options.lloid_cache)): - bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) - - if len(bgbin_lloid_map.values()[0]) == 1: - # Starting with 1:1 mapping between files and bins, - # thus no merging is needed yet - outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))] - for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)] - - else: - for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - db = inputs_to_db(jobs, dbs) - sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file) - outnodes.setdefault(None, []).append(sqlitenode) - - for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - bgbin_lloid_map = {} - for ce in map(CacheEntry, open(inj_lloid_cache)): - bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) - for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - injdb = inputs_to_db(jobs, dbs) - sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file) - outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode) + parents.append(marg_nodes[bin_key]) + likelihood_url = marg_nodes[bin_key].output_files["output"] + likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents) - else: - # Starting with output of analysis before naming convention - # update (commit 5efd78fee6b371c999f510d07be33ec64f385695), - # so all lloid files contain gps times for entire run, and are - # numbered by iterating through segments in a given bin first - # (e.g. files 0000 to 0009 may all belong to bin 0000, then - # files 0010 to 0019 would all belong to bin 0001, etc) - for ce_list in dagparts.groups(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin): - noninjdb = cache_to_db(ce_list, jobs) - # merge all of the dbs from the same subbank - sqlitenode = merge_cluster_layer(dag, jobs, [], noninjdb, [ce.path for ce in ce_list], options) - outnodes.setdefault(None, []).append(sqlitenode) - - for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - for ce_list in dagparts.groups(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin): - injdb = cache_to_db(ce_list, jobs) - # merge all of the dbs from the same subbank - sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, [ce.path for ce in ce_list], options) - outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode) - - return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes + return likelihood_nodes def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments): + num_chunks = 100 innodes = {} # after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run - files_to_group = 100 - for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()): - if inj is None: - innode_key = None - sql_file = options.cluster_sql_file - else: - innode_key = sim_tag_from_inj_file(inj) - sql_file = options.injection_sql_file - - # Flatten the nodes for this sub bank - nodes = dagparts.flatten(nodes) - merge_nodes = [] - # Flatten the input/output files from calc_likelihood - inputs = dagparts.flatten([node.input_files["input-cache"] for node in nodes]) - - # files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run - for input_files in dagparts.groups(inputs, files_to_group): - merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes, - opts = {"sql-file": sql_file, "tmp-space": dagparts.condor_scratch_space()}, - input_files = {"": input_files} - ) + for (sim_tag, bin_key), (inputs, likelihood_url, parents) in sorted(likelihood_nodes.items()): + db = inputs_to_db(jobs, inputs, job_type = 'toSqliteNoCache') + xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz") + snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file + cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file + + # cluster sub banks + cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents, + opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":inputs} + ) + + # merge sub banks + merge_node = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes = [cluster_node], + input_files = {"":inputs}, + output_files = {"output":xml} + ) + + # cluster and simplify sub banks + cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [merge_node], + opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":xml} + ) + + # assign likelihoods + likelihood_node = dagparts.DAGNode(jobs['calcLikelihood'], dag, + parent_nodes = [cluster_node], + opts = {"tmp-space":dagparts.condor_scratch_space()}, + input_files = {"likelihood-url":likelihood_url, "": xml} ) - if options.copy_raw_results: - merge_nodes[-1].set_pre_script("store_raw.sh") - merge_nodes[-1].add_pre_script_arg(" ".join(input_files)) - # Merging all the dbs from the same sub bank - for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]): - db = inputs_to_db(jobs, inputs) - sqlitenode = merge_cluster_layer(dag, jobs, merge_nodes, db, inputs, sql_file) - innodes.setdefault(innode_key, []).append(sqlitenode) + sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [likelihood_node], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":xml}, + output_files = {"database":db}, + ) + sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode], + opts = {"sql-file": cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":db} + ) + + innodes.setdefault(sim_tag_from_inj_file(sim_tag) if sim_tag else None, []).append(sqlitenode) # make sure outnodes has a None key, even if its value is an empty list + # FIXME injection dag is broken innodes.setdefault(None, []) - num_chunks = 100 - if options.vetoes is None: vetoes = [] else: @@ -1241,7 +1189,7 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o noninjdb = options.non_injection_db else: final_nodes = [] - for chunk, nodes in enumerate(dagparts.groups(innodes[None], 10)): + for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)): noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite') # cluster the final non injection database @@ -1259,8 +1207,6 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o outnodes.append(cpnode) - # FIXME far-injections currently doesnt work, either fix it or delete it - #for injections, far_injections in zip(options.injections, options.far_injections): if options.injections: iterable_injections = options.injections else: @@ -1302,12 +1248,6 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o injdbs.append(injdb) injxml = injdb.replace('.sqlite','.xml.gz') - # FIXME far-injections currently doesnt work, either fix it or delete it - ## If there are injections that are too far away to be seen in a separate file, add them now. - #if far_injections is not None: - # xml_input = [injxml] + [far_injections] - #else: - # xml_input = injxml xml_input = injxml # merge and cluster @@ -1544,10 +1484,6 @@ if __name__ == '__main__': # calc rank PDF jobs rankpdf_nodes, rankpdf_zerolag_nodes = calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set) - else: - # Merge lloid files into 1 file per bin if not already 1 file per bin - rankpdf_nodes, rankpdf_zerolag_nodes, likelihood_nodes = merge_in_bin_layer(dag, jobs, options) - # final marginalization step final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes) @@ -1562,7 +1498,7 @@ if __name__ == '__main__': farnode = compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes) # make summary plots - plotnodes = summary_plot_layer(dag, jobs, farnode, options) + plotnodes = summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb) # make a web page summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs)