diff --git a/gstlal-inspiral/bin/gstlal_inspiral_pipe b/gstlal-inspiral/bin/gstlal_inspiral_pipe index 195f99729663781d14122c1b8e4194a18e14aabb..a351808bb7c241171bd27ce97fa86dc3ae14c252 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_inspiral_pipe @@ -98,7 +98,7 @@ def get_bank_params(bank_cache, options, verbose = False): return max_time, template_mchirp_dict def chunks(l, n): - for i in xrange(0, len(l), n): + for i in range(0, len(l), n): yield l[i:i+n] def flatten(lst): @@ -368,6 +368,38 @@ def parse_command_line(): #---------------------------------------------------------- ### DAG utilities +def get_threshold_values(bg_bin_indices, svd_bank_strings, options): + """Calculate the appropriate ht-gate-threshold values according to the scale given + """ + if options.ht_gate_threshold_linear is not None: + # A scale is given + mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")] + # use max mchirp in a given svd bank to decide gate threshold + bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices] + gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min) + threshold_values = [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps] + elif options.ht_gate_threshold is not None: + threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given + else: + threshold_values = None + +def inputs_to_db(jobs, inputs): + dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs] + db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') + return os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db) + +def cache_to_db(cache, jobs): + hi_index = cache[-1].description.split('_')[0] + db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path)) + db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) + return db + +def get_rank_file(instruments, boundary_seg, n, basename, job=None): + if job: + return dagparts.T050017_filename(instruments, '_'.join(['%04d'%n, basename]), boundary_seg, '.xml.gz', path = job.output_path) + else: + return dagparts.T050017_filename(instruments, '_'.join(['%04d'%n, basename]), boundary_seg, '.cache') + def set_up_jobs(options): jobs = {} @@ -622,17 +654,7 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa bgbin_chunk_map.setdefault(bgbin, chunk_counter) # Calculate the appropriate ht-gate-threshold values according to the scale given - threshold_values = None - if options.ht_gate_threshold_linear is not None: - # A scale is given - mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")] - # use max mchirp in a given svd bank to decide gate threshold - bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices] - gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min) - threshold_values = [gate_mchirp_ratio * (bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps] - else: - if options.ht_gate_threshold is not None: - threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given + threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options) # non injection node noninjnode = dagparts.DAGNode(jobs['gstlalInspiral'], dag, @@ -669,12 +691,15 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa "ranking-stat-output-cache":dist_stat_names } ) + # Set a post script to check for file integrity if options.gzip_test: noninjnode.set_post_script("gzip_test.sh") noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names)) + # impose a priority to help with depth first submission noninjnode.set_priority(chunk_counter+15) + inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode) # process injections @@ -692,26 +717,16 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa try: reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"] parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices] - except AttributeError: - # injection-only run + except AttributeError: ### injection-only run reference_psd = psd_nodes[(ifos, seg)] parents = [] - # Calculate the appropriate ht-gate-threshold values according to the scale given - threshold_values = None - if options.ht_gate_threshold_linear is not None: - # A scale is given - mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")] - # use max mchirp in a given svd bank to decide gate threshold - bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices] - gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min) - threshold_values = [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps] - else: - if options.ht_gate_threshold is not None: - threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given svd_files = [CacheEntry.from_T050017(filename) for filename in svd_names] input_cache_name = dagparts.group_T050017_filename_from_T050017_files(svd_files, '.cache').replace('SVD', 'SVD_%s' % sim_name) + # Calculate the appropriate ht-gate-threshold values according to the scale given + threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options) + # setup injection node injnode = dagparts.DAGNode(jobs['gstlalInspiralInj'], dag, parent_nodes = parents, @@ -749,11 +764,13 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa if options.gzip_test: injnode.set_post_script("gzip_test.sh") injnode.add_post_script_arg(" ".join(output_names)) + # impose a priority to help with depth first submission if bgbin_chunk_map: injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1) else: injnode.set_priority(chunk_counter+1) + inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode) # Replace mchirplo:mchirphi:inj.xml with inj.xml @@ -776,11 +793,12 @@ def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_s inj_splitter_node.set_priority(98) # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache - for i in range(num_split_inj_snr_jobs): + injection_files = ["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)] + for injection_file in injection_files: injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node], - opts = {"flow":options.flow,"fmax":options.fmax}, + opts = {"flow": options.flow, "fmax": options.fmax}, input_files = { - "injection-file": "%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i), + "injection-file": injection_file, "reference-psd-cache": "reference_psd.cache" } ) @@ -788,7 +806,7 @@ def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_s inj_snr_nodes.append(injSNRnode) addnode = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes=inj_snr_nodes, - input_files = {"": ' '.join(["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)])}, + input_files = {"": ' '.join(injection_files)}, output_files = {"output": inj.split(":")[-1]} ) @@ -811,6 +829,14 @@ def summary_plot_layer(dag, jobs, parent_nodes, options): "shrink-data-segments": 32.0, "extend-veto-segments": 8., } + sensitivity_opts = { + "output-dir":output_dir, + "tmp-space":dagparts.condor_scratch_space(), + "veto-segments-name":"vetoes", + "bin-by-source-type":"", + "dist-bins":200, + "data-segments-name":"datasegments" + } ### plot summary opts = {"user-tag": "ALL_LLOID_COMBINED", "remove-precession": ""} @@ -846,30 +872,19 @@ def summary_plot_layer(dag, jobs, parent_nodes, options): )) ### sensitivity plots + opts = {"user-tag": "ALL_LLOID_COMBINED"} + opts.update(sensivity_opts) plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode], - opts = { - "user-tag":"ALL_LLOID_COMBINED", - "output-dir":output_dir, - "tmp-space":dagparts.condor_scratch_space(), - "veto-segments-name":"vetoes", - "bin-by-source-type":"", - "dist-bins":200, - "data-segments-name":"datasegments" - }, - input_files = {"zero-lag-database":noninjdb, "":injdbs} + opts = opts, + input_files = {"zero-lag-database": noninjdb, "": injdbs} )) + + opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]} + opts.update(sensivity_opts) for injdb in injdbs: plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode], - opts = { - "user-tag": injdb.replace(".sqlite","").split("-")[1], - "output-dir":output_dir, - "tmp-space":dagparts.condor_scratch_space(), - "veto-segments-name":"vetoes", - "bin-by-source-type":"", - "dist-bins":200, - "data-segments-name":"datasegments" - }, - input_files = {"zero-lag-database":noninjdb, "":injdb} + opts = opts, + input_files = {"zero-lag-database": noninjdb, "": injdb} )) ### background plots @@ -950,8 +965,9 @@ def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, llo # first non-injections, which will get skipped if this is an injections-only run for n, (outputs, diststats) in enumerate((lloid_output[None][key], lloid_diststats[key]) for key in sorted(lloid_output[None].keys())): inputs = [o[0] for o in outputs] - parents = [] - [parents.extend(o[1]) for o in outputs] + parents = flatten([o[1] for o in outputs]) + rankfile = functools.partial(get_rank_file, instruments, boundary_seg, n) + # FIXME we keep this here in case we someday want to have a # mass bin dependent prior, but it really doesn't matter for # the time being. @@ -959,7 +975,7 @@ def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, llo parent_nodes = [one_ifo_svd_nodes[n]] + mass_model_add_node, opts = {"instrument":instrument_set, "background-prior":1, "min-instruments":options.min_instruments}, input_files = {"svd-file":one_ifo_svd_nodes[n].output_files["write-svd"], "mass-model-file":mass_model_file}, - output_files = {"write-likelihood":dagparts.T050017_filename(instruments, '%04d_CREATE_PRIOR_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = jobs['createPriorDistStats'].output_path)} + output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])} ) # Create a file that has the priors *and* all of the diststats # for a given bin marginalized over time. This is all that will @@ -968,22 +984,22 @@ def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, llo parent_nodes = [priornode] + parents, opts = {"marginalize":"ranking-stat"}, input_cache_files = {"likelihood-cache":diststats + [priornode.output_files["write-likelihood"]]}, - output_files = {"output":dagparts.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = jobs['marginalize'].output_path)}, - input_cache_file_name = dagparts.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.cache') + output_files = {"output": rankfile('MARG_DIST_STATS', job=jobs['marginalize'])}, + input_cache_file_name = rankfile('MARG_DIST_STATS') ) calcranknode = dagparts.DAGNode(jobs['calcRankPDFs'], dag, parent_nodes = [diststats_per_bin_node], opts = {"ranking-stat-samples":options.ranking_stat_samples}, input_files = {"":diststats_per_bin_node.output_files["output"]}, - output_files = {"output":dagparts.T050017_filename(instruments, '%04d_CALC_RANK_PDFS' % (n,), boundary_seg, '.xml.gz', path = jobs['calcRankPDFs'].output_path)} + output_files = {"output": rankfile('CALC_RANK_PDFS', job=jobs['calcRankPDFs'])}, ) calcrankzerolagnode = dagparts.DAGNode(jobs['calcRankPDFsWithZerolag'], dag, parent_nodes = [diststats_per_bin_node], opts = {"add-zerolag-to-background":"","ranking-stat-samples":options.ranking_stat_samples}, input_files = {"":diststats_per_bin_node.output_files["output"]}, - output_files = {"output":dagparts.T050017_filename(instruments, '%04d_CALC_RANK_PDFS_WZL' % (n,), boundary_seg, '.xml.gz', path = jobs['calcRankPDFsWithZerolag'].output_path)} + output_files = {"output": rankfile('CALC_RANK_PDFS_WZL', job=jobs['calcRankPDFsWithZerolag'])}, ) margnodes['%04d' %(n,)] = diststats_per_bin_node @@ -1003,26 +1019,25 @@ def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, llo # then injections for inj in options.injections: lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)] - for n, (outputs, diststats, bgbin_index) in enumerate(((lloid_nodes[key], lloid_diststats[key], key) for key in sorted(lloid_nodes.keys()))): - if outputs is None: - continue - inputs = [o[0] for o in outputs] - parents = [] - [parents.extend(o[1]) for o in outputs] - if margnodes: - parents.append(margnodes[bgbin_index]) - likelihood_url = margnodes[bgbin_index].output_files["output"] - else: - likelihood_url = diststats[0] - # Break up the likelihood jobs into chunks to process fewer files, e.g., 16 - likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append( - [dagparts.DAGNode(jobs['calcLikelihoodInj'], dag, - parent_nodes = parents, - opts = {"tmp-space":dagparts.condor_scratch_space()}, - input_files = {"likelihood-url":likelihood_url}, - input_cache_files = {"input-cache":chunked_inputs} - ) for chunked_inputs in chunks(inputs, 16)] - ) + for outputs, diststats, bgbin_index in ((lloid_nodes[key], lloid_diststats[key], key) for key in sorted(lloid_nodes.keys())): + if outputs is not None: + inputs = [o[0] for o in outputs] + parents = flatten([o[1] for o in outputs]) + if margnodes: + parents.append(margnodes[bgbin_index]) + likelihood_url = margnodes[bgbin_index].output_files["output"] + else: + likelihood_url = diststats[0] + + # Break up the likelihood jobs into chunks to process fewer files, e.g., 16 + likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append( + [dagparts.DAGNode(jobs['calcLikelihoodInj'], dag, + parent_nodes = parents, + opts = {"tmp-space":dagparts.condor_scratch_space()}, + input_files = {"likelihood-url":likelihood_url}, + input_cache_files = {"input-cache":chunked_inputs} + ) for chunked_inputs in chunks(inputs, 16)] + ) # after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run @@ -1035,23 +1050,20 @@ def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, llo inputs = flatten([node.input_files["input-cache"] for node in nodes]) # files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run - for n in range(0, len(inputs), files_to_group): + for input_files in chunks(inputs, files_to_group): merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes, - opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, - input_files = {"":inputs[n:n+files_to_group]} + opts = {"sql-file": options.cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()}, + input_files = {"": input_files} ) ) if options.copy_raw_results: merge_nodes[-1].set_pre_script("store_raw.sh") - merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group])) + merge_nodes[-1].add_pre_script_arg(" ".join(input_files)) # Merging all the dbs from the same sub bank outnode_key = sim_tag_from_inj_file(inj) if inj is not None else None for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]): - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs] - db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') - db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db) - + db = inputs_to_db(jobs, inputs) sqlitenode = merge_cluster_layer(dag, jobs, merge_nodes, db, inputs, options) outnodes.setdefault(outnode_key, []).append(sqlitenode) @@ -1079,10 +1091,7 @@ def merge_in_bin_layer(dag, jobs, options): else: for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] - db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') - db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db) - + db = inputs_to_db(jobs, dbs) sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options) outnodes.setdefault(None, []).append(sqlitenode) @@ -1092,10 +1101,7 @@ def merge_in_bin_layer(dag, jobs, options): bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] - db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') - db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db) - + db = inputs_to_db(jobs, dbs) sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options) outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode) @@ -1107,18 +1113,14 @@ def merge_in_bin_layer(dag, jobs, options): # (e.g. files 0000 to 0009 may all belong to bin 0000, then # files 0010 to 0019 would all belong to bin 0001, etc) for ce_list in chunks(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin): - hi_index = ce_list[-1].description.split('_')[0] - noninjdb = os.path.join(jobs['toSqlite'].output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) - + noninjdb = cache_to_db(ce_list, jobs) # merge all of the dbs from the same subbank sqlitenode = merge_cluster_layer(dag, jobs, [], noninjdb, [ce.path for ce in ce_list], options) outnodes.setdefault(None, []).append(sqlitenode) for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): for ce_list in chunks(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin): - hi_index = ce_list[-1].description.split('_')[0] - injdb = os.path.join(jobs['toSqlite'].output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) - + injdb = cache_to_db(ce_list, jobs) # merge all of the dbs from the same subbank sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, [ce.path for ce in ce_list], options) outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode) @@ -1285,25 +1287,28 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument return injdbs, noninjdb, outnodes, dbs_to_delete def compute_fap_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, injdbs, noninjdb, final_sqlite_nodes): - # compute FAPs and FARs - # split up the marginalization into groups of 10 + """compute FAPs and FARs + """ try: margin = [node.output_files["output"] for node in rankpdf_nodes] parents = rankpdf_nodes margin_zerolag = [node.output_files["output"] for node in rankpdf_zerolag_nodes] parents_zerolag = rankpdf_zerolag_nodes - except AttributeError: - # analysis started at merger step + except AttributeError: ### analysis started at merger step margin = rankpdf_nodes parents = [] margin_zerolag = rankpdf_zerolag_nodes parents_zerolag = [] + margout = [] margzerolagout = [] margnodes = [] margzerolagnodes = [] margnum = 16 - for i,n in enumerate(range(0, len(margin), margnum)): + + # split up the marginalization into groups of 10 + # FIXME: is it actually groups of 10 or groups of 16? + for n in range(0, len(margin), margnum): margfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin[n:n+margnum]] margout.append(dagparts.group_T050017_filename_from_T050017_files(margfiles, '.xml.gz', path = jobs['marginalize'].output_path)) margnodes.append(dagparts.DAGNode(jobs['marginalize'], dag, parent_nodes = parents,