Skip to content
Snippets Groups Projects
Commit 3c6c1b25 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_inspiral_pipe: factor out merge/cluster step into merge_cluster_layer()

parent 56f0b449
No related branches found
No related tags found
No related merge requests found
......@@ -195,6 +195,20 @@ def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
return lloid_output, lloid_diststats
def set_up_scripts(options):
# Make an xml integrity checker
if options.gzip_test:
with open("gzip_test.sh", "w") as f:
f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
# A pre script to backup data before feeding to lossy programs
# (e.g. clustering routines)
with open("store_raw.sh", "w") as f:
f.write("""#!/bin/bash
for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
#----------------------------------------------------------
### command line options
......@@ -352,7 +366,7 @@ def parse_command_line():
#----------------------------------------------------------
### DAG setup utilities
### DAG utilities
def set_up_jobs(options):
jobs = {}
......@@ -427,21 +441,6 @@ def set_up_jobs(options):
return jobs
def set_up_scripts(options):
# Make an xml integrity checker
if options.gzip_test:
with open("gzip_test.sh", "w") as f:
f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
# A pre script to backup data before feeding to lossy programs
# (e.g. clustering routines)
with open("store_raw.sh", "w") as f:
f.write("""#!/bin/bash
for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
#----------------------------------------------------------
### DAG layers
......@@ -943,6 +942,20 @@ def model_layer(dag, jobs, parent_nodes, instruments, options, seg, template_ban
else:
return [], options.mass_model_file
def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, options):
# Merge database into chunks
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache": db_cache},
output_files = {"database":db},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
# cluster database
return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file": options.cluster_sql_file, "tmp-space": dagparts.condor_scratch_space()},
input_files = {"": db}
)
def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, mass_model_add_node, mass_model_file):
likelihood_nodes = {}
......@@ -1041,62 +1054,27 @@ def rank_and_merge_layer(dag, jobs, svd_nodes, inspiral_nodes, lloid_output, llo
merge_nodes = []
# Flatten the input/output files from calc_likelihood
inputs = flatten([node.input_files["input-cache"] for node in nodes])
if inj is None:
# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
for n in range(0, len(inputs), files_to_group):
merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":inputs[n:n+files_to_group]}
)
)
if options.copy_raw_results:
merge_nodes[-1].set_pre_script("store_raw.sh")
merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
# Merging all the dbs from the same sub bank
for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = merge_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache":inputs},
output_files = {"database":db},
input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
)
sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":db}
)
outnodes.setdefault(None, []).append(sqlitenode)
else:
# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
for n in range(0, len(inputs), files_to_group):
merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":inputs[n:n+files_to_group]}
)
)
if options.copy_raw_results:
merge_nodes[-1].set_pre_script("store_raw.sh")
merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
# Merging all the dbs from the same sub bank and injection run
for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
injdb_files = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
injdb = dagparts.group_T050017_filename_from_T050017_files(injdb_files, '.sqlite')
injdb = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(injdb).description[:4]]), injdb)
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = merge_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache":inputs},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":injdb}
# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
for n in range(0, len(inputs), files_to_group):
merge_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = nodes,
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":inputs[n:n+files_to_group]}
)
outnodes.setdefault(sim_tag_from_inj_file(inj), []).append(sqlitenode)
)
if options.copy_raw_results:
merge_nodes[-1].set_pre_script("store_raw.sh")
merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
# Merging all the dbs from the same sub bank
outnode_key = sim_tag_from_inj_file(inj) if inj is not None else None
for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
sqlitenode = merge_cluster_layer(dag, jobs, merge_nodes, db, inputs, options)
outnodes.setdefault(outnode_key, []).append(sqlitenode)
# make sure outnodes has a None key, even if its value is an empty list
outnodes.setdefault(None, [])
......@@ -1123,21 +1101,12 @@ def merge_in_bin_layer(dag, jobs, options):
else:
for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
# merge all of the dbs from the same subbank
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache":dbs},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":noninjdb}
)
db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options)
outnodes.setdefault(None, []).append(sqlitenode)
for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
bgbin_lloid_map = {}
for ce in map(CacheEntry, open(inj_lloid_cache)):
......@@ -1145,20 +1114,10 @@ def merge_in_bin_layer(dag, jobs, options):
for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
# merge all of the dbs from the same subbank
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache":dbs},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":injdb}
)
db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
db = os.path.join(subdir_path([jobs['toSqlite'].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
sqlitenode = merge_cluster_layer(dag, jobs, [], db, inputs, options)
outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
else:
......@@ -1173,18 +1132,7 @@ def merge_in_bin_layer(dag, jobs, options):
noninjdb = os.path.join(jobs['toSqlite'].output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
# merge all of the dbs from the same subbank
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":noninjdb}
)
sqlitenode = merge_cluster_layer(dag, jobs, [], noninjdb, [ce.path for ce in ce_list], options)
outnodes.setdefault(None, []).append(sqlitenode)
for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
......@@ -1193,18 +1141,7 @@ def merge_in_bin_layer(dag, jobs, options):
injdb = os.path.join(jobs['toSqlite'].output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
# merge all of the dbs from the same subbank
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":injdb}
)
sqlitenode = merge_cluster_layer(dag, jobs, [], injdb, [ce.path for ce in ce_list], options)
outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes
......@@ -1232,21 +1169,11 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
dbs = nodes
parents = []
# Merge the final non injection database into chunks
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parents,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache": dbs},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
# cluster the final non injection database
noninjsqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":noninjdb}
)
# Merge and cluster the final non injection database
noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options)
chunk_nodes.append(noninjsqlitenode)
dbs_to_delete.append(noninjdb)
......@@ -1261,18 +1188,9 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
final_nodes = []
for chunk, nodes in enumerate(chunks(innodes[None], 10)):
noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache": [node.input_files[""] for node in nodes]},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
# cluster the final non injection database
noninjsqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":noninjdb}
)
noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options)
final_nodes.append(noninjsqlitenode)
noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
......@@ -1319,20 +1237,8 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
# merge
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parents,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache":dbs},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
# cluster
clusternode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":injdb}
)
# merge and cluster
clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options)
chunk_nodes.append(clusternode)
dbs_to_delete.append(injdb)
......@@ -1342,19 +1248,8 @@ def finalize_run_layer(dag, jobs, innodes, ligolw_add_nodes, options, instrument
# Setup the final output names, etc.
injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
# merge
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = injnodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_cache_files = {"input-cache": [node.input_files[""] for node in injnodes]},
output_files = {"database":injdb},
input_cache_file_name = injdb.replace('.sqlite','.cache')
)
# cluster
clusternode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":injdb}
)
# merge and cluster
clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options)
final_nodes.append(clusternode)
# Setup the final output names, etc.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment