diff --git a/gstlal-inspiral/bin/gstlal_inspiral_dag b/gstlal-inspiral/bin/gstlal_inspiral_dag index af09299946996b48e4e3fb426477d39e1fcb1b68..7c0be6d1ea31d9bf895ef5d5decf42483002aebe 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_dag +++ b/gstlal-inspiral/bin/gstlal_inspiral_dag @@ -265,7 +265,6 @@ def parse_command_line(): # FIXME far-injections currently doesnt work, either fix it or delete it #parser.add_option("--far-injections", action = "append", help = "Injection files with injections too far away to be seen and are not filtered. Required. See https://www.lsc-group.phys.uwm.edu/ligovirgo/cbcnote/NSBH/MdcInjections/MDC1 for example.") parser.add_option("--singles-threshold", default=float("inf"), action = "store", metavar="THRESH", help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).") - parser.add_option("--copy-raw-results", default=False, action = "store_true", help = "Copy raw gstlal_inspiral results before applying clustering and other lossy operations.") parser.add_option("--gzip-test", default=False, action = "store_true", help = "Perform gzip --test on all output files.") parser.add_option("--verbose", action = "store_true", help = "Be verbose") parser.add_option("--disable-calc-inj-snr", default=False, action = "store_true", help = "Disable injection SNR calculation") @@ -287,7 +286,6 @@ def parse_command_line(): parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)") parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)") parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)") - parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions") parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)") # Condor commands @@ -394,12 +392,6 @@ def inputs_to_db(jobs, inputs): db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') return os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db) -def cache_to_db(cache, jobs): - hi_index = cache[-1].description.split('_')[0] - db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path)) - db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) - return db - def get_rank_file(instruments, boundary_seg, n, basename, job=None): if job: return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path) @@ -460,6 +452,7 @@ def set_up_jobs(options): jobs['injSplitter'] = dagparts.DAGJob("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = base_condor_commands) jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands) jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands) + jobs['mergeAndReduce'] = dagparts.DAGJob("gstlal_inspiral_merge_and_reduce", condor_commands = base_condor_commands) jobs['calcLikelihoodInj'] = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands=base_condor_commands) jobs['ComputeFarFromSnrChisqHistograms'] = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = base_condor_commands) jobs['ligolwInspinjFind'] = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = base_condor_commands) @@ -791,7 +784,7 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa # NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict) - return inspiral_nodes, lloid_output, lloid_diststats + return lloid_output, lloid_diststats def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs): ligolw_add_nodes = [] @@ -956,29 +949,6 @@ def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd): else: return [], options.mass_model_file -def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None): - """merge and cluster from sqlite database - """ - if input_files: - input_ = {"": input_files} - else: - input_ = {} - - # Merge database into chunks - sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes, - opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, - input_files = input_, - input_cache_files = {"input-cache": db_cache}, - output_files = {"database":db}, - input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache') - ) - - # cluster database - return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode], - opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()}, - input_files = {"": db} - ) - def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file): instruments = "".join(sorted(instrument_set)) margnodes = {} @@ -1045,14 +1015,13 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument return rankpdf_nodes, rankpdf_zerolag_nodes -def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set): +def likelihood_layer(dag, jobs, marg_nodes, final_sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set): likelihood_nodes = {} instruments = "".join(sorted(instrument_set)) chunk_size = 16 bgbin_indices = sorted(lloid_output[None].keys()) for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices): - rankfile = functools.partial(get_rank_file, instruments, boundary_seg, '%04d'%n) inputs = [o[0] for o in outputs] # Break up the likelihood jobs into chunks to process fewer files, e.g, 16 @@ -1095,232 +1064,112 @@ def merge_in_bin_layer(dag, jobs, options): rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0])) rankpdf_zerolag_nodes = [] outnodes = {} - if options.num_files_per_background_bin == 1: - bgbin_lloid_map = {} - # Get list of all files for each background bin (will be same length for each bin) - for ce in map(CacheEntry, open(options.lloid_cache)): - bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) - - if len(bgbin_lloid_map.values()[0]) == 1: - # Starting with 1:1 mapping between files and bins, - # thus no merging is needed yet - outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))] - for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)] - - else: - for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - db = inputs_to_db(jobs, dbs) - sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file) - outnodes.setdefault(None, []).append(sqlitenode) - - for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - bgbin_lloid_map = {} - for ce in map(CacheEntry, open(inj_lloid_cache)): - bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) - - for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - injdb = inputs_to_db(jobs, dbs) - sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file) - outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode) + bgbin_lloid_map = {} + # Get list of all files for each background bin (will be same length for each bin) + for ce in map(CacheEntry, open(options.lloid_cache)): + bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) + + if len(bgbin_lloid_map.values()[0]) == 1: + # Starting with 1:1 mapping between files and bins, + # thus no merging is needed yet + outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))] + for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): + outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)] else: - # Starting with output of analysis before naming convention - # update (commit 5efd78fee6b371c999f510d07be33ec64f385695), - # so all lloid files contain gps times for entire run, and are - # numbered by iterating through segments in a given bin first - # (e.g. files 0000 to 0009 may all belong to bin 0000, then - # files 0010 to 0019 would all belong to bin 0001, etc) - for ce_list in dagparts.groups(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin): - noninjdb = cache_to_db(ce_list, jobs) - # merge all of the dbs from the same subbank - sqlitenode = merge_cluster_layer(dag, jobs, [], noninjdb, [ce.path for ce in ce_list], options) + for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): + db = inputs_to_db(jobs, dbs) + sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options.cluster_sql_file) outnodes.setdefault(None, []).append(sqlitenode) for i, inj_lloid_cache in enumerate(options.inj_lloid_cache): - for ce_list in dagparts.groups(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin): - injdb = cache_to_db(ce_list, jobs) - # merge all of the dbs from the same subbank - sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, [ce.path for ce in ce_list], options) + bgbin_lloid_map = {} + for ce in map(CacheEntry, open(inj_lloid_cache)): + bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) + + for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): + injdb = inputs_to_db(jobs, dbs) + sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, inputs, options.injection_sql_file) outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode) return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes -def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments): - innodes = {} - - # after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run - files_to_group = 40 - for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()): - if inj is None: - innode_key = None - sql_file = options.cluster_sql_file - else: - innode_key = sim_tag_from_inj_file(inj) - sql_file = options.injection_sql_file - - # Flatten the nodes for this sub bank - nodes = dagparts.flatten(nodes) - merge_nodes = [] - # Flatten the input/output files from calc_likelihood - inputs = dagparts.flatten([node.input_files["input-cache"] for node in nodes]) - - # files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run - for input_files in dagparts.groups(inputs, files_to_group): - merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes, - opts = {"sql-file": sql_file, "tmp-space": dagparts.condor_scratch_space()}, - input_files = {"": input_files} - ) - ) - if options.copy_raw_results: - merge_nodes[-1].set_pre_script("store_raw.sh") - merge_nodes[-1].add_pre_script_arg(" ".join(input_files)) - - # Merging all the dbs from the same sub bank - for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]): - db = inputs_to_db(jobs, inputs) - sqlitenode = merge_cluster_layer(dag, jobs, merge_nodes, db, inputs, sql_file) - innodes.setdefault(innode_key, []).append(sqlitenode) - - # make sure outnodes has a None key, even if its value is an empty list - innodes.setdefault(None, []) - - num_chunks = 50 - - if options.vetoes is None: - vetoes = [] - else: - vetoes = [options.vetoes] - - chunk_nodes = [] +def cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg): + """cluster and merge lloid output into a single database + """ + sqlite_nodes = {} dbs_to_delete = [] - # Process the chirp mass bins in chunks to paralellize the merging process - for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)): - try: - dbs = [node.input_files[""] for node in nodes] - parents = nodes + instruments = "".join(sorted(instrument_set)) + chunk_size = 10 - except AttributeError: - # analysis started at merger step but seeded by lloid files which - # have already been merged into one file per background - # bin, thus the analysis will begin at this point - dbs = nodes - parents = [] + # define lloid files + bgbin_indices = sorted(lloid_output[None].keys()) + output_lloid = [(file_, node) for key in bgbin_indices for file_, node in lloid_output[None][key]] + + ### cluster and merge + while len(output_lloid) > 1: + input_lloid = output_lloid + output_lloid = [] + for lloid_chunk in dagparts.groups(input_lloid, chunk_size): + cache, nodes = zip(*lloid_chunk) + nodes = list(set(dagparts.flatten(nodes))) + + dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in cache] + noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['mergeAndReduce'].output_path) + + # Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10 + mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag, + parent_nodes = nodes, + opts = {"sql-file": options.cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()}, + input_files = {}, + input_cache_files = {"input-cache": cache}, + output_files = {"database": noninjdb}, + ) - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] - noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path) + dbs_to_delete.append(noninjdb) + output_lloid.append((noninjdb, [mergenode])) - # Merge and cluster the final non injection database - noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file) - chunk_nodes.append(noninjsqlitenode) - dbs_to_delete.append(noninjdb) + sqlite_nodes[None] = [o[1] for o in output_lloid] - # Merge the final non injection database - outnodes = [] + # then injections injdbs = [] - if options.non_injection_db: #### injection-only run - noninjdb = options.non_injection_db - else: - final_nodes = [] - for chunk, nodes in enumerate(dagparts.groups(innodes[None], 10)): - noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite') - - # cluster the final non injection database - noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file) - final_nodes.append(noninjsqlitenode) - - input_files = (vetoes + [options.frame_segments_file]) - input_cache_files = [node.input_files[""] for node in final_nodes] - noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite') - noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files) - - cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode], - input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} - ) - - outnodes.append(cpnode) - - # FIXME far-injections currently doesnt work, either fix it or delete it - #for injections, far_injections in zip(options.injections, options.far_injections): - if options.injections: - iterable_injections = options.injections - else: - iterable_injections = options.injections_for_merger - - for injections in iterable_injections: - # extract only the nodes that were used for injections - chunk_nodes = [] - - for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)): - try: - dbs = [injnode.input_files[""] for injnode in injnodes] - parents = injnodes - except AttributeError: - dbs = injnodes - parents = [] - - # Setup the final output names, etc. - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] - injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path) - - # merge and cluster - clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file) - chunk_nodes.append(clusternode) - dbs_to_delete.append(injdb) - - - final_nodes = [] - for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)): - # Setup the final output names, etc. - injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite') + for inj in options.injections: + sim_tag = sim_tag_from_inj_file(inj) + + # define lloid files + bgbin_indices = sorted(lloid_output[sim_tag].keys()) + output_lloid = [(file_, node) for key in bgbin_indices for file_, node in lloid_output[sim_tag][key]] + + ### cluster and merge + while len(output_lloid) > 1: + input_lloid = output_lloid + output_lloid = [] + for lloid_chunk in dagparts.groups(input_lloid, chunk_size): + cache, nodes = zip(*lloid_chunk) + nodes = list(set(dagparts.flatten(nodes))) + + dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in cache] + injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['mergeAndReduce'].output_path) + + # Break up the merge/reduce jobs into chunks to process fewer files, e.g, 10 + mergenode = dagparts.DAGNode(jobs['mergeAndReduce'], dag, + parent_nodes = nodes, + opts = {"sql-file": options.injection_sql_file, "tmp-space": dagparts.condor_scratch_space()}, + input_files = {}, + input_cache_files = {"input-cache": cache}, + output_files = {"database": injdb}, + ) - # merge and cluster - clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.cluster_sql_file) - final_nodes.append(clusternode) + dbs_to_delete.append(injdb) + output_lloid.append((injdb, [mergenode])) - # Setup the final output names, etc. - injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite') + sqlite_nodes[sim_tag] = [o[1] for o in output_lloid] injdbs.append(injdb) - injxml = injdb.replace('.sqlite','.xml.gz') - - # FIXME far-injections currently doesnt work, either fix it or delete it - ## If there are injections that are too far away to be seen in a separate file, add them now. - #if far_injections is not None: - # xml_input = [injxml] + [far_injections] - #else: - # xml_input = injxml - xml_input = injxml - - # merge and cluster - parent_nodes = final_nodes + ligolw_add_nodes - input_files = (vetoes + [options.frame_segments_file, injections]) - input_cache_files = [node.input_files[""] for node in final_nodes] - clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files) - - clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode], - opts = {"tmp-space":dagparts.condor_scratch_space()}, - output_files = {"extract":injxml}, - input_files = {"database":injdb} - ) - - inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode], - opts = {"time-window":0.9}, - input_files = {"":injxml} - ) - sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode], - opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, - output_files = {"database":injdb}, - input_files = {"":xml_input} - ) - - cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode], - input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} - ) - - outnodes.append(cpnode) + # remove final databases from databases considered for deletion + dbs_to_delete = [db for db in dbs_to_delete if db not in injdbs and db != noninjdb] - return injdbs, noninjdb, outnodes, dbs_to_delete + return sqlite_nodes, injdbs, noninjdb, dbs_to_delete def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes): ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes] @@ -1370,19 +1219,20 @@ def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes): return final_margnodes, dagparts.flatten(all_margcache) -def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes): +def compute_far_layer(dag, jobs, likelihood_nodes, final_margnodes, injdbs, noninjdb): """compute FAPs and FARs """ + all_likelihood_nodes = dagparts.flatten(dagparts.flatten(likelihood_nodes.values())) margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file] filesuffixs = ['', '_with_zerolag'] - for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs): + for margnode, margfile, filesuffix in zip(final_margnodes, margfiles, filesuffixs): if options.marginalized_likelihood_file: ### injection-only run - parents = final_sqlite_nodes + parents = all_likelihood_nodes marginalized_likelihood_file = margfile else: - parents = [margnode] + final_sqlite_nodes + parents = [margnode] + all_likelihood_nodes marginalized_likelihood_file = margnode.output_files["output"] farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents, @@ -1514,7 +1364,7 @@ if __name__ == '__main__': if not options.lloid_cache: # Inspiral jobs by segment - inspiral_nodes, lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict) + lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict) # marginalize jobs marg_nodes = marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file) @@ -1524,20 +1374,20 @@ if __name__ == '__main__': else: # Merge lloid files into 1 file per bin if not already 1 file per bin - rankpdf_nodes, rankpdf_zerolag_nodes, likelihood_nodes = merge_in_bin_layer(dag, jobs, options) + lloid_output = retrieve_lloid_output(dag, jobs, options) + rankpdf_nodes, rankpdf_zerolag_nodes, marg_nodes = merge_in_bin_layer(dag, jobs, options) + + # Setup clustering and/or merging + sqlite_nodes, injdbs, noninjdb, dbs_to_delete = cluster_and_merge_layer(dag, jobs, lloid_output, options, instruments, boundary_seg) # final marginalization step final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes) # likelihood jobs - likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set) - - # Setup clustering and/or merging - # after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof) - injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments) + likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, sqlite_nodes, injdbs, noninjdb, options, boundary_seg, instrument_set) # Compute FAR - farnode = compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes) + farnode = compute_far_layer(dag, jobs, likelihood_nodes, final_marg_nodes, injdbs, noninjdb) # make summary plots plotnodes = summary_plot_layer(dag, jobs, farnode, options)