Commit d40786db authored by Patrick Godwin's avatar Patrick Godwin

gstlal_inspiral_pipe: use OrderedDict for condor opts to preserve order, add...

gstlal_inspiral_pipe: use OrderedDict for condor opts to preserve order, add in dynamic memory requests and ifo dependent memory requests for inspiral jobs
parent bc101361
Pipeline #77190 passed with stages
in 24 minutes and 25 seconds
......@@ -42,6 +42,7 @@ __author__ = 'Chad Hanna <chad.hanna@ligo.org>, Patrick Godwin <patrick.godwin@l
#----------------------------------------------------------
### imports
from collections import OrderedDict
import functools
import itertools
import os
......@@ -292,8 +293,6 @@ def parse_command_line():
parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")
# Condor commands
parser.add_option("--request-cpu", default = "4", metavar = "integer", help = "set the inspiral CPU count, default = 4")
parser.add_option("--request-memory", default = "7GB", metavar = "integer", help = "set the inspiral memory, default = 7GB")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
parser.add_option("--max-inspiral-jobs", type="int", metavar = "jobs", help = "Set the maximum number of gstlal_inspiral jobs to run simultaneously, default no constraint.")
......@@ -412,23 +411,57 @@ def get_rank_file(instruments, boundary_seg, n, basename, job=None):
def set_up_jobs(options):
jobs = {}
# default condor options
default_condor_opts = OrderedDict()
default_condor_opts['want_graceful_removal'] = "True"
default_condor_opts['kill_sig'] = "15"
default_condor_opts['request_cpus'] = "1"
default_condor_opts['+MemoryUsage'] = "( 1000 ) * 2 / 3"
default_condor_opts['request_memory'] = "( MemoryUsage ) * 3 / 2"
default_condor_opts['periodic_hold'] = "( MemoryUsage >= ( ( RequestMemory ) * 3 / 2 ) )"
default_condor_opts['periodic_release'] = "(JobStatus == 5) && ((CurrentTime - EnteredCurrentStatus) > 180) && (HoldReasonCode != 34)"
# job-specific condor options
ref_psd_condor_opts = default_condor_opts.copy()
ref_psd_condor_opts['request_cpus'] = "2"
calc_rank_pdf_condor_opts = default_condor_opts.copy()
calc_rank_pdf_condor_opts['request_cpus'] = "4"
svd_condor_opts = default_condor_opts.copy()
svd_condor_opts['+MemoryUsage'] = "( 7000 ) * 2 / 3"
inj_snr_condor_opts = default_condor_opts.copy()
inj_snr_condor_opts['+MemoryUsage'] = "( 2000 ) * 2 / 3"
inj_snr_condor_opts['request_cpus'] = "2"
inspiral_1ifo_condor_opts = default_condor_opts.copy()
inspiral_1ifo_condor_opts['+MemoryUsage'] = "( 3000 ) * 2 / 3"
inspiral_2ifo_condor_opts = default_condor_opts.copy()
inspiral_2ifo_condor_opts['+MemoryUsage'] = "( 4000 ) * 2 / 3"
inspiral_3ifo_condor_opts = default_condor_opts.copy()
inspiral_3ifo_condor_opts['+MemoryUsage'] = "( 7000 ) * 2 / 3"
inspiral_3ifo_condor_opts['request_cpus'] = "2"
# set condor commands
base_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})
ref_psd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})
calc_rank_pdf_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"})
svd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"})
inj_snr_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})
base_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, default_condor_opts)
ref_psd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, ref_psd_condor_opts)
calc_rank_pdf_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, calc_rank_pdf_condor_opts)
svd_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, svd_condor_opts)
inj_snr_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inj_snr_condor_opts)
sh_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})
inspiral_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {
"request_memory":options.request_memory,
"request_cpus":options.request_cpu,
"want_graceful_removal":"True",
"kill_sig":"15"
})
inspiral_1ifo_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inspiral_1ifo_condor_opts)
inspiral_2ifo_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inspiral_2ifo_condor_opts)
inspiral_3ifo_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, inspiral_3ifo_condor_opts)
if options.dist_stats_cache:
# injection-only run
jobs['gstlalInspiral'] = None
jobs['gstlalInspiral1IFO'] = None
jobs['gstlalInspiral2IFO'] = None
jobs['gstlalInspiral3IFO'] = None
jobs['createPriorDistStats'] = None
jobs['calcRankPDFs'] = None
jobs['calcRankPDFsWithZerolag'] = None
......@@ -450,7 +483,9 @@ def set_up_jobs(options):
jobs['model'] = dagparts.DAGJob("gstlal_inspiral_mass_model", condor_commands = base_condor_commands)
jobs['modelAdd'] = dagparts.DAGJob("gstlal_inspiral_add_mass_models", condor_commands = base_condor_commands)
jobs['horizon'] = dagparts.DAGJob("gstlal_plot_psd_horizon", condor_commands = base_condor_commands)
jobs['gstlalInspiral'] = dagparts.DAGJob("gstlal_inspiral", condor_commands = inspiral_condor_commands)
jobs['gstlalInspiral1IFO'] = dagparts.DAGJob("gstlal_inspiral", tag_base="gstlal_inspiral_1ifo", condor_commands = inspiral_1ifo_condor_commands)
jobs['gstlalInspiral2IFO'] = dagparts.DAGJob("gstlal_inspiral", tag_base="gstlal_inspiral_2ifo", condor_commands = inspiral_2ifo_condor_commands)
jobs['gstlalInspiral3IFO'] = dagparts.DAGJob("gstlal_inspiral", tag_base="gstlal_inspiral_3ifo", condor_commands = inspiral_3ifo_condor_commands)
jobs['createPriorDistStats'] = dagparts.DAGJob("gstlal_inspiral_create_prior_diststats", condor_commands = base_condor_commands)
jobs['calcRankPDFs'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", condor_commands = calc_rank_pdf_condor_commands)
jobs['calcRankPDFsWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", tag_base="gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands=calc_rank_pdf_condor_commands)
......@@ -459,7 +494,9 @@ def set_up_jobs(options):
jobs['marginalizeWithZerolag'] = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base="gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands=base_condor_commands)
# set up rest of jobs
jobs['gstlalInspiralInj'] = dagparts.DAGJob("gstlal_inspiral", tag_base="gstlal_inspiral_inj", condor_commands = inspiral_condor_commands)
jobs['gstlalInspiralInj1IFO'] = dagparts.DAGJob("gstlal_inspiral", tag_base="gstlal_inspiral_inj_1ifo", condor_commands = inspiral_1ifo_condor_commands)
jobs['gstlalInspiralInj2IFO'] = dagparts.DAGJob("gstlal_inspiral", tag_base="gstlal_inspiral_inj_2ifo", condor_commands = inspiral_2ifo_condor_commands)
jobs['gstlalInspiralInj3IFO'] = dagparts.DAGJob("gstlal_inspiral", tag_base="gstlal_inspiral_inj_3ifo", condor_commands = inspiral_3ifo_condor_commands)
jobs['injSplitter'] = dagparts.DAGJob("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = base_condor_commands)
jobs['gstlalInjSnr'] = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = inj_snr_condor_commands)
jobs['ligolwAdd'] = dagparts.DAGJob("ligolw_add", condor_commands = base_condor_commands)
......@@ -616,6 +653,9 @@ def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_m
def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):
inspiral_nodes = {}
for ifos in segsdict:
# FIXME: handles more than 3 ifos with same cpu/memory requests
inspiral_name = 'gstlalInspiral%dIFO' % min(len(ifos), 3)
inspiral_inj_name = 'gstlalInspiralInj%dIFO' % min(len(ifos), 3)
# setup dictionaries to hold the inspiral nodes
inspiral_nodes[(ifos, None)] = {}
......@@ -652,14 +692,14 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa
for seg in segsdict[ifos]:
if injection_files:
output_seg_inj_path = subdir_path([jobs['gstlalInspiralInj'].output_path, str(int(seg[0]))[:5]])
output_seg_inj_path = subdir_path([jobs[inspiral_inj_name].output_path, str(int(seg[0]))[:5]])
if jobs['gstlalInspiral'] is None:
if jobs[inspiral_name] is None:
# injection-only run
inspiral_nodes[(ifos, None)].setdefault(seg, [None])
else:
output_seg_path = subdir_path([jobs['gstlalInspiral'].output_path, str(int(seg[0]))[:5]])
output_seg_path = subdir_path([jobs[inspiral_name].output_path, str(int(seg[0]))[:5]])
for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)):
bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
# setup output names
......@@ -674,7 +714,7 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa
threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options)
# non injection node
noninjnode = dagparts.DAGNode(jobs['gstlalInspiral'], dag,
noninjnode = dagparts.DAGNode(jobs[inspiral_name], dag,
parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
opts = {
"psd-fft-length":options.psd_fft_length,
......@@ -745,7 +785,8 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa
threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options)
# setup injection node
injnode = dagparts.DAGNode(jobs['gstlalInspiralInj'], dag,
# FIXME: handles more than 3 ifos with same cpu/memory requests
injnode = dagparts.DAGNode(jobs[inspiral_inj_name], dag,
parent_nodes = parents,
opts = {
"psd-fft-length":options.psd_fft_length,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment