Skip to content
Snippets Groups Projects
Commit b4592686 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_inspiral_pipe: factor out more merge and cluster steps, add minor comments

parent 244f7ee0
No related branches found
No related tags found
No related merge requests found
......@@ -894,6 +894,8 @@ def summary_plot_layer(dag, jobs, parent_nodes, options):
return plotnodes
def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete):
"""clean intermediate merger products
"""
for db in dbs_to_delete:
dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
input_files = {"": db}
......@@ -921,6 +923,8 @@ def inj_psd_layer(segsdict, options):
return psd_nodes, ref_psd_parent_nodes
def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd):
"""mass model node
"""
if options.mass_model_file is None:
# choose, arbitrarily, the lowest instrument in alphabetical order
model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = jobs['model'].output_path)
......@@ -934,10 +938,18 @@ def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd):
else:
return [], options.mass_model_file
def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile):
def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None):
"""merge and cluster from sqlite database
"""
if input_files:
input_ = {"": input_files}
else:
input_ = {}
# Merge database into chunks
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_files = input_,
input_cache_files = {"input-cache": db_cache},
output_files = {"database":db},
input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
......@@ -1180,20 +1192,10 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file)
final_nodes.append(noninjsqlitenode)
input_files = (vetoes + [options.frame_segments_file])
input_cache_files = [node.input_files[""] for node in final_nodes]
noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = final_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_files = {"": (vetoes + [options.frame_segments_file])},
input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
# cluster the final non injection database
noninjsqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":noninjdb}
)
noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode],
input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
......@@ -1252,20 +1254,11 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
# xml_input = injxml
xml_input = injxml
# merge
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = final_nodes + ligolw_add_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_files = {"": (vetoes + [options.frame_segments_file, injections])},
input_cache_files = {"input-cache": [node.input_files[""] for node in final_nodes]},
output_files = {"database":injdb},
input_cache_file_name = injdb.replace('.sqlite','.cache')
)
# cluster
clusternode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":injdb}
)
# merge and cluster
parent_nodes = final_nodes + ligolw_add_nodes
input_files = (vetoes + [options.frame_segments_file, injections])
input_cache_files = [node.input_files[""] for node in final_nodes]
clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode],
opts = {"tmp-space":dagparts.condor_scratch_space()},
......@@ -1361,6 +1354,8 @@ def compute_fap_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, injdbs, n
return outnode, flatten(all_margcache)
def horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir):
"""calculate horizon distance
"""
dagparts.DAGNode(jobs['horizon'], dag,
parent_nodes = psd_nodes.values(),
input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]},
......@@ -1368,6 +1363,8 @@ def horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir):
)
def summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs):
"""create a summary page
"""
output_user_tags = ["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"]
output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs])
output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment