Skip to content
Snippets Groups Projects
Commit add832bc authored by AntoniRamosBuades's avatar AntoniRamosBuades
Browse files

Settings to run in pablo2

parent 4dbbf335
Branches master
No related tags found
No related merge requests found
Pipeline #266299 failed
......@@ -17,14 +17,12 @@ def remove_argument_from_parser(parser, arg):
try:
parser._handle_conflict_resolve(None, [("--" + arg, action)])
except ValueError as e:
logger.warning("Error removing {}: {}".format(arg, e))
logger.warning(
"Request to remove arg {} from bilby_pipe args, but arg not found".format(arg)
)
logger.warning(f"Error removing {arg}: {e}")
logger.debug(f"Request to remove arg {arg} from bilby_pipe args, but arg not found")
class StoreBoolean(argparse.Action):
""" argparse class for robust handling of booleans with configargparse
"""argparse class for robust handling of booleans with configargparse
When using configargparse, if the argument is setup with
action="store_true", but the default is set to True, then there is no way,
......@@ -46,9 +44,7 @@ def _create_base_parser(sampler="dynesty"):
base_parser.add(
"--version",
action="version",
version="%(prog)s={version}\nbilby={bilby_version}".format(
version=__version__, bilby_version=bilby.__version__
),
version=f"%(prog)s={__version__}\nbilby={bilby.__version__}",
)
if sampler in ["all", "dynesty"]:
base_parser = _add_dynesty_settings_to_parser(base_parser)
......@@ -126,10 +122,52 @@ def _add_dynesty_settings_to_parser(parser):
)
dynesty_group.add_argument(
"--n-check-point",
default=100000,
default=100,
type=int,
help="Steps to take before attempting checkpoint",
)
dynesty_group.add_argument(
"--max-its",
default=10 ** 10,
type=int,
help="Maximum number of iterations to sample for (default=1.e10)",
)
dynesty_group.add_argument(
"--max-run-time",
default=1.0e10,
type=float,
help="Maximum time to run for (default=1.e10 s)",
)
dynesty_group.add_argument(
"--fast-mpi",
default=False,
type=bool,
help="Fast MPI communication pattern (default=False)",
)
dynesty_group.add_argument(
"--mpi-timing",
default=False,
type=bool,
help="Print MPI timing when finished (default=False)",
)
dynesty_group.add_argument(
"--mpi-timing-interval",
default=0,
type=int,
help="Interval to write timing snapshot to disk (default=0 -- disabled)",
)
dynesty_group.add_argument(
"--nestcheck",
default=False,
action="store_true",
help=(
"Save a 'nestcheck' pickle in the outdir (default=False). "
"This pickle stores a `nestcheck.data_processing.process_dynesty_run` "
"object, which can be used during post processing to compute the "
"implementation and bootstrap errors explained by Higson et al (2018) "
"in “Sampling Errors In Nested Sampling Parameter Estimation”."
),
)
return parser
......@@ -231,8 +269,8 @@ def _add_misc_settings_to_parser(parser):
)
misc_group.add_argument(
"--sampling-seed",
type=int,
default=1234,
type=bilby_pipe.utils.noneint,
default=None,
help="Random seed for sampling, parallel runs will be incremented",
)
misc_group.add_argument(
......@@ -254,9 +292,14 @@ def _add_misc_settings_to_parser(parser):
)
misc_group.add_argument(
"--check-point-deltaT",
default=600,
default=3600,
type=float,
help="Write a checkpoint resume file and diagnostic plots every deltaT [s]",
help="Write a checkpoint resume file and diagnostic plots every deltaT [s].",
)
misc_group.add_argument(
"--rotate-checkpoints",
action="store_true",
help="If true, backup checkpoint before overwriting (ending in '.bk').",
)
return parser
......@@ -266,8 +309,11 @@ def _add_slurm_settings_to_parser(parser):
slurm_group.add_argument(
"--nodes", type=int, required=True, help="Number of nodes to use"
)
#slurm_group.add_argument(
# "--ntasks-per-node", type=int, required=True, help="Number of tasks per node"
#)
slurm_group.add_argument(
"--ntasks-per-node", type=int, required=True, help="Number of tasks per node"
"--cpus-per-task", type=int, required=True, help="Number of tasks per node"
)
slurm_group.add_argument(
"--time",
......@@ -280,7 +326,7 @@ def _add_slurm_settings_to_parser(parser):
"--mem-per-cpu",
type=str,
default=None,
help="Memory per CPU",
help="Memory per CPU (defaults to None)",
)
slurm_group.add_argument(
"--extra-lines",
......@@ -322,12 +368,12 @@ def _create_reduced_bilby_pipe_parser():
"sampler",
"sampling-seed",
"sampler-kwargs",
"plot_calibration",
"plot_corner",
"plot_format",
"plot_marginal",
"plot_skymap",
"plot_waveform",
"plot-calibration",
"plot-corner",
"plot-format",
"plot-marginal",
"plot-skymap",
"plot-waveform",
]
for arg in bilby_pipe_arguments_to_ignore:
remove_argument_from_parser(bilby_pipe_parser, arg)
......
......@@ -5,7 +5,6 @@ from .utils import get_cli_args
def setup_submit(data_dump_file, inputs, args):
# Create analysis nodes
analysis_nodes = []
for idx in range(args.n_parallel):
......@@ -19,17 +18,16 @@ def setup_submit(data_dump_file, inputs, args):
else:
final_analysis_node = analysis_nodes[0]
bash_script = "{}/bash_{}.sh".format(inputs.submit_directory, inputs.label)
bash_script = f"{inputs.submit_directory}/bash_{inputs.label}.sh"
with open(bash_script, "w+") as ff:
dependent_job_ids = []
for ii, node in enumerate(analysis_nodes):
print("jid{}=$(sbatch {})".format(ii, node.filename), file=ff)
dependent_job_ids.append("${{jid{}##* }}".format(ii))
print(f"jid{ii}=$(sbatch {node.filename})", file=ff)
dependent_job_ids.append(f"${{jid{ii}##* }}")
if len(analysis_nodes) > 1:
print(
"sbatch --dependency=afterok:{} {}".format(
":".join(dependent_job_ids), final_analysis_node.filename
),
f"sbatch --dependency=afterok:{':'.join(dependent_job_ids)} "
f"{final_analysis_node.filename}",
file=ff,
)
print('squeue -u $USER -o "%u %.10j %.8A %.4C %.40E %R"', file=ff)
......@@ -40,23 +38,25 @@ def setup_submit(data_dump_file, inputs, args):
class BaseNode(object):
def get_lines(self):
lines = ["#!/bin/bash"]
lines.append("#SBATCH --job-name={}".format(self.job_name))
lines.append(f"#SBATCH --job-name={self.job_name}")
if self.nodes > 1:
lines.append("#SBATCH --nodes={}".format(self.nodes))
if self.ntasks_per_node > 1:
lines.append("#SBATCH --ntasks-per-node={}".format(self.ntasks_per_node))
lines.append("#SBATCH --time={}".format(self.time))
lines.append(f"#SBATCH --nodes={self.nodes}")
#if self.ntasks_per_node > 1:
# lines.append(f"#SBATCH --ntasks-per-node={self.ntasks_per_node}")
if self.cpus_per_task > 1:
lines.append("#SBATCH --cpus-per-task={}".format(self.cpus_per_task))
lines.append(f"#SBATCH --time={self.time}")
if self.args.mem_per_cpu is not None:
lines.append("#SBATCH --mem-per-cpu={}".format(self.mem_per_cpu))
lines.append("#SBATCH --output={}/{}.log".format(self.logs, self.job_name))
lines.append(f"#SBATCH --mem-per-cpu={self.mem_per_cpu}")
lines.append(f"#SBATCH --output={self.logs}/{self.job_name}.log")
if self.args.slurm_extra_lines is not None:
slurm_extra_lines = " ".join(
["--{}".format(lin) for lin in self.args.slurm_extra_lines.split()]
[f"--{lin}" for lin in self.args.slurm_extra_lines.split()]
)
for line in slurm_extra_lines.split():
lines.append("#SBATCH {}".format(line))
lines.append(f"#SBATCH {line}")
lines.append("")
if self.args.extra_lines is not None:
if self.args.extra_lines:
for line in self.args.extra_lines.split(";"):
lines.append(line.strip())
lines.append("")
......@@ -78,13 +78,14 @@ class AnalysisNode(BaseNode):
self.inputs = inputs
self.args = args
self.idx = idx
self.filename = "{}/analysis_{}_{}.sh".format(
self.inputs.submit_directory, self.inputs.label, self.idx
self.filename = (
f"{self.inputs.submit_directory}/"
f"analysis_{self.inputs.label}_{self.idx}.sh"
)
self.job_name = "{}_{}".format(self.idx, self.inputs.label)
self.job_name = f"{self.idx}_{self.inputs.label}"
self.nodes = self.args.nodes
self.ntasks_per_node = self.args.ntasks_per_node
#self.ntasks_per_node = self.args.ntasks_per_node
self.cpus_per_task = self.args.cpus_per_task
self.time = self.args.time
self.mem_per_cpu = self.args.mem_per_cpu
self.logs = self.inputs.data_analysis_log_directory
......@@ -104,17 +105,17 @@ class AnalysisNode(BaseNode):
return "parallel_bilby_ptemcee_analysis"
else:
raise ValueError(
"Unable to determine sampler to use from {}".format(self.args.sampler)
f"Unable to determine sampler to use from {self.args.sampler}"
)
@property
def label(self):
return "{}_{}".format(self.inputs.label, self.idx)
return f"{self.inputs.label}_{self.idx}"
@property
def output_filename(self):
return "{}/{}_{}_result.json".format(
self.inputs.result_directory, self.inputs.label, self.idx
return (
f"{self.inputs.result_directory}/{self.inputs.label}_{self.idx}_result.json"
)
def get_contents(self):
......@@ -122,15 +123,16 @@ class AnalysisNode(BaseNode):
lines.append('export MKL_NUM_THREADS="1"')
lines.append('export MKL_DYNAMIC="FALSE"')
lines.append("export OMP_NUM_THREADS=1")
lines.append("export MPI_PER_NODE={}".format(self.args.ntasks_per_node))
#lines.append("export MPI_PER_NODE={}".format(self.args.ntasks_per_node))
lines.append("export MPI_PER_NODE={}".format(self.args.cpus_per_task))
lines.append("")
run_string = self.get_run_string()
lines.append("mpirun {} {}".format(self.executable, run_string))
lines.append(f"mpirun {self.executable} {run_string}")
return "\n".join(lines)
def get_run_string(self):
run_list = ["{}".format(self.data_dump_file)]
run_list = [f"{self.data_dump_file}"]
for key, val in vars(self.analysis_args).items():
if key in ["data_dump", "label", "outdir", "sampling_seed"]:
continue
......@@ -138,15 +140,13 @@ class AnalysisNode(BaseNode):
if val != input_val:
if input_val is True:
# For flags only add the flag
run_list.append("--{}".format(key.replace("_", "-")))
run_list.append(f"--{key.replace('_', '-')}")
else:
run_list.append("--{} {}".format(key.replace("_", "-"), input_val))
run_list.append(f"--{key.replace('_', '-')} {input_val}")
run_list.append("--label {}".format(self.label))
run_list.append("--outdir {}".format(abspath(self.inputs.result_directory)))
run_list.append(
"--sampling-seed {}".format(self.analysis_args.sampling_seed + self.idx)
)
run_list.append(f"--label {self.label}")
run_list.append(f"--outdir {abspath(self.inputs.result_directory)}")
run_list.append(f"--sampling-seed {self.inputs.sampling_seed + self.idx}")
return " ".join(run_list)
......@@ -157,16 +157,15 @@ class MergeNodes(BaseNode):
self.inputs = inputs
self.args = args
self.job_name = "merge_{}".format(self.inputs.label)
self.job_name = f"merge_{self.inputs.label}"
self.nodes = 1
self.ntasks_per_node = 1
#self.ntasks_per_node = 1
self.cpus_per_task = 1
self.time = "1:00:00"
self.mem_per_cpu = "16GB"
self.logs = self.inputs.data_analysis_log_directory
self.filename = "{}/merge_{}.sh".format(
self.inputs.submit_directory, self.inputs.label
)
self.filename = f"{self.inputs.submit_directory}/merge_{self.inputs.label}.sh"
@property
def file_list(self):
......@@ -174,13 +173,14 @@ class MergeNodes(BaseNode):
@property
def merged_result_label(self):
return "{}_merged".format(self.inputs.label)
return f"{self.inputs.label}_merged"
def get_contents(self):
lines = self.get_lines()
lines.append(
"bilby_result -r {} --merge --label {} --outdir {}".format(
self.file_list, self.merged_result_label, self.inputs.result_directory
)
f"bilby_result -r {self.file_list} "
f"--merge "
f"--label {self.merged_result_label} "
f"--outdir {self.inputs.result_directory}"
)
return "\n".join(lines)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment