Skip to content
Snippets Groups Projects
Commit 8e2c0dbf authored by chad.hanna's avatar chad.hanna
Browse files

gstlal_inspiral_pipe: remove bottle neck at median psd and final db merge stages

parent 0a2a20f2
No related branches found
No related tags found
No related merge requests found
......@@ -807,11 +807,28 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq
noninjdb = options.non_injection_db
else:
final_nodes = []
for chunk, nodes in enumerate(chunks(innodes[None], 10)):
noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache": [node.input_files[""] for node in nodes]},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
# cluster the final non injection database
noninjsqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":noninjdb}
)
final_nodes.append(noninjsqlitenode)
noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = chunk_nodes,
sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = final_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""},
input_files = {"": (vetoes + [options.frame_segments_file])},
input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]},
input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
......@@ -869,6 +886,26 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq
dbs_to_delete.append(injdb)
final_nodes = []
for chunk, injnodes in enumerate(chunks(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
# Setup the final output names, etc.
injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
# merge
sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = injnodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache": [node.input_files[""] for node in injnodes]},
output_files = {"database":injdb},
input_cache_file_name = injdb.replace('.sqlite','.cache')
)
# cluster
clusternode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":injdb}
)
final_nodes.append(clusternode)
# Setup the final output names, etc.
injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
injdbs.append(injdb)
......@@ -885,10 +922,10 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq
xml_input = injxml
# merge
sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = chunk_nodes + ligolw_add_nodes,
sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = final_nodes + ligolw_add_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""},
input_files = {"": (vetoes + [options.frame_segments_file, injections])},
input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]},
input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]},
output_files = {"database":injdb},
input_cache_file_name = injdb.replace('.sqlite','.cache')
)
......@@ -1338,10 +1375,20 @@ elif options.reference_psd is None:
#
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
median_psd_nodes = []
for chunk, nodes in enumerate(chunks(psd_nodes.values(), 50)):
median_psd_node = \
dagparts.DAGNode(medianPSDJob, dag,
parent_nodes = psd_nodes.values(),
input_files = {"": [node.output_files["write-psd"] for node in nodes]},
output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD_CHUNK_%04d" % chunk, boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))}
)
median_psd_nodes.append(median_psd_node)
median_psd_node = \
dagparts.DAGNode(medianPSDJob, dag,
parent_nodes = psd_nodes.values(),
input_files = {"input-cache": "reference_psd.cache"},
parent_nodes = median_psd_nodes,
input_files = {"": [node.output_files["output-name"] for node in median_psd_nodes]},
output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))}
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment