diff --git a/gstlal-inspiral/bin/gstlal_inspiral_pipe b/gstlal-inspiral/bin/gstlal_inspiral_pipe index 17e00545d177d53ce5a1d6b41dbff994e3be5603..30c9a989bd3cb2b615ff1024a2cf40b1e248eb90 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_inspiral_pipe @@ -807,11 +807,28 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq noninjdb = options.non_injection_db else: + final_nodes = [] + for chunk, nodes in enumerate(chunks(innodes[None], 10)): + noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite') + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = nodes, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, + input_cache_files = {"input-cache": [node.input_files[""] for node in nodes]}, + output_files = {"database":noninjdb}, + input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache') + ) + + # cluster the final non injection database + noninjsqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":noninjdb} + ) + final_nodes.append(noninjsqlitenode) + noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite') - sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = chunk_nodes, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = final_nodes, opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_files = {"": (vetoes + [options.frame_segments_file])}, - input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]}, + input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]}, output_files = {"database":noninjdb}, input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache') ) @@ -869,6 +886,26 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq dbs_to_delete.append(injdb) + final_nodes = [] + for chunk, injnodes in enumerate(chunks(innodes[sim_tag_from_inj_file(injections)], num_chunks)): + # Setup the final output names, etc. + injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite') + + # merge + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = injnodes, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, + input_cache_files = {"input-cache": [node.input_files[""] for node in injnodes]}, + output_files = {"database":injdb}, + input_cache_file_name = injdb.replace('.sqlite','.cache') + ) + + # cluster + clusternode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":injdb} + ) + final_nodes.append(clusternode) + # Setup the final output names, etc. injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite') injdbs.append(injdb) @@ -885,10 +922,10 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq xml_input = injxml # merge - sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = chunk_nodes + ligolw_add_nodes, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = final_nodes + ligolw_add_nodes, opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_files = {"": (vetoes + [options.frame_segments_file, injections])}, - input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]}, + input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]}, output_files = {"database":injdb}, input_cache_file_name = injdb.replace('.sqlite','.cache') ) @@ -1338,10 +1375,20 @@ elif options.reference_psd is None: # # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache + median_psd_nodes = [] + for chunk, nodes in enumerate(chunks(psd_nodes.values(), 50)): + median_psd_node = \ + dagparts.DAGNode(medianPSDJob, dag, + parent_nodes = psd_nodes.values(), + input_files = {"": [node.output_files["write-psd"] for node in nodes]}, + output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD_CHUNK_%04d" % chunk, boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))} + ) + median_psd_nodes.append(median_psd_node) + median_psd_node = \ dagparts.DAGNode(medianPSDJob, dag, - parent_nodes = psd_nodes.values(), - input_files = {"input-cache": "reference_psd.cache"}, + parent_nodes = median_psd_nodes, + input_files = {"": [node.output_files["output-name"] for node in median_psd_nodes]}, output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))} )