Commit f94d2f0c authored by Avi Vajpeyi's avatar Avi Vajpeyi 👽 Committed by Gregory Ashton
Browse files

modularise job creation code

parent 9cadeafc
from .bilby_pipe_dag_creator import generate_dag
import numpy as np
from ..utils import BilbyPipeError, convert_string_to_tuple, logger
from .dag import Dag
from .nodes import (
AnalysisNode,
GenerationNode,
MergeNode,
PESummaryNode,
PlotNode,
PostProcessAllResultsNode,
PostProcessSingleResultsNode,
)
from .overview import create_overview
def get_trigger_time_list(inputs):
""" Returns a list of GPS trigger times for each data segment """
if inputs.gaussian_noise:
trigger_times = [0] * inputs.n_simulation
elif inputs.trigger_time is not None:
trigger_times = [inputs.trigger_time]
elif inputs.gps_tuple is not None:
start, dt, N = convert_string_to_tuple(inputs.gps_tuple)
start_times = np.linspace(start, start + (N - 1) * dt, N)
trigger_times = start_times + inputs.duration - inputs.post_trigger_duration
elif inputs.gps_file is not None:
start_times = inputs.gpstimes
trigger_times = start_times + inputs.duration - inputs.post_trigger_duration
else:
raise BilbyPipeError("Unable to determine input trigger times from ini file")
logger.info("Setting segment trigger-times {}".format(trigger_times))
return trigger_times
def get_detectors_list(inputs):
detectors_list = []
detectors_list.append(inputs.detectors)
if inputs.coherence_test:
for detector in inputs.detectors:
detectors_list.append([detector])
return detectors_list
def get_parallel_list(inputs):
if inputs.n_parallel == 1:
return [""]
else:
return ["par{}".format(idx) for idx in range(inputs.n_parallel)]
def generate_dag(inputs):
dag = Dag(inputs)
trigger_times = get_trigger_time_list(inputs)
generation_node_list = []
for idx, trigger_time in enumerate(trigger_times):
generation_node = GenerationNode(
inputs, trigger_time=trigger_time, idx=idx, dag=dag
)
generation_node_list.append(generation_node)
detectors_list = get_detectors_list(inputs)
parallel_list = get_parallel_list(inputs)
merged_node_list = []
all_parallel_node_list = []
for generation_node in generation_node_list:
for detectors in detectors_list:
parallel_node_list = []
for parallel_idx in parallel_list:
analysis_node = AnalysisNode(
inputs,
generation_node=generation_node,
detectors=detectors,
parallel_idx=parallel_idx,
dag=dag,
sampler=inputs.sampler,
)
parallel_node_list.append(analysis_node)
all_parallel_node_list.append(analysis_node)
if len(parallel_node_list) == 1:
merged_node_list.append(analysis_node)
else:
merge_node = MergeNode(
inputs=inputs,
parallel_node_list=parallel_node_list,
detectors=detectors,
dag=dag,
)
merged_node_list.append(merge_node)
plot_nodes_list = []
for merged_node in merged_node_list:
if inputs.create_plots:
plot_nodes_list.append(PlotNode(inputs, merged_node, dag=dag))
if inputs.single_postprocessing_executable:
PostProcessSingleResultsNode(inputs, merged_node, dag=dag)
if inputs.create_summary:
PESummaryNode(inputs, merged_node_list, generation_node_list, dag=dag)
if inputs.postprocessing_executable is not None:
PostProcessAllResultsNode(inputs, merged_node_list, dag)
dag.build()
create_overview(
inputs,
generation_node_list,
all_parallel_node_list,
merged_node_list,
plot_nodes_list,
)
import os
import sys
import pycondor
from ..utils import logger
from . import slurm
class Dag(object):
""" Base Dag object, handles the creation of the DAG structure """
def __init__(self, inputs):
self.inputs = inputs
self.dag_name = "dag_{}".format(inputs.label)
# The slurm setup uses the pycondor dag as a base
if self.inputs.scheduler.lower() in ["condor", "slurm"]:
self.setup_pycondor_dag()
def setup_pycondor_dag(self):
self.pycondor_dag = pycondor.Dagman(
name=self.dag_name, submit=self.inputs.submit_directory
)
def build(self):
if self.inputs.scheduler.lower() == "condor":
self.build_pycondor_dag()
self.write_bash_script()
elif self.inputs.scheduler.lower() == "slurm":
self.scheduler = self.inputs.scheduler
self.scheduler_args = self.inputs.scheduler_args
self.scheduler_module = self.inputs.scheduler_module
self.scheduler_env = self.inputs.scheduler_env
self.build_slurm_submit()
def build_pycondor_dag(self):
""" Build the pycondor dag, optionally submit them if requested """
submitted = False
if self.inputs.submit:
try:
self.pycondor_dag.build_submit(fancyname=False)
submitted = True
except OSError:
logger.warning("Unable to submit files")
self.pycondor_dag.build(fancyname=False)
else:
self.pycondor_dag.build(fancyname=False)
if submitted:
logger.info("DAG generation complete and submitted")
else:
command_line = "$ condor_submit_dag {}".format(
os.path.relpath(self.pycondor_dag.submit_file)
)
logger.info(
"DAG generation complete, to submit jobs run:\n {}".format(
command_line
)
)
# Debugging feature: create a "visualisation" of the DAG
if "--create-dag-plot" in sys.argv:
try:
self.pycondor_dag.visualize(
"{}/{}_visualization.png".format(
self.inputs.submit_directory, self.pycondor_dag.name
)
)
except Exception:
pass
def build_slurm_submit(self):
""" Build slurm submission scripts """
slurm.SubmitSLURM(self)
def write_bash_script(self):
""" Write the dag to a bash script for command line running """
with open(self.bash_file, "w") as ff:
ff.write("#!/usr/bin/env bash\n\n")
for node in self.pycondor_dag.nodes:
ff.write("# {}\n".format(node.name))
ff.write(
"# PARENTS {}\n".format(
" ".join([job.name for job in node.parents])
)
)
ff.write(
"# CHILDREN {}\n".format(
" ".join([job.name for job in node.children])
)
)
job_str = "{} {}\n\n".format(node.executable, node.args[0].arg)
ff.write(job_str)
@property
def bash_file(self):
bash_file = self.pycondor_dag.submit_file.replace(".submit", ".sh").replace(
"dag_", "bash_"
)
return bash_file
import os
import re
import shutil
import subprocess
from pathlib import Path
import pycondor
from ..utils import CHECKPOINT_EXIT_CODE, ArgumentsString, logger
class Node(object):
""" Base Node object, handles creation of arguments, executables, etc """
def __init__(self, inputs):
self.inputs = inputs
self._universe = "vanilla"
self.request_disk = None
self.online_pe = self.inputs.online_pe
self.getenv = True
self.notification = False
self.retry = None
self.verbose = 0
self.extra_lines = list(self.inputs.extra_lines)
self.requirements = (
[self.inputs.requirements] if self.inputs.requirements else []
)
@property
def universe(self):
return self._universe
def process_node(self):
self.create_pycondor_job()
if self.inputs.run_local:
logger.info(
"Running command: "
+ " ".join([self.executable] + self.arguments.argument_list)
)
subprocess.run([self.executable] + self.arguments.argument_list, check=True)
@staticmethod
def _get_executable_path(exe_name):
exe = shutil.which(exe_name)
if exe is not None:
return exe
else:
raise OSError(
"{} not installed on this system, unable to proceed".format(exe_name)
)
def setup_arguments(
self, add_command_line_args=True, add_ini=True, add_unknown_args=True
):
self.arguments = ArgumentsString()
if add_ini:
self.arguments.add_positional_argument(self.inputs.complete_ini_file)
if add_unknown_args:
self.arguments.add_unknown_args(self.inputs.unknown_args)
if add_command_line_args:
self.arguments.add_command_line_arguments()
@property
def log_directory(self):
raise NotImplementedError()
def create_pycondor_job(self):
job_name = self.job_name
self.extra_lines.extend(
_log_output_error_submit_lines(self.log_directory, job_name)
)
self.extra_lines.append("accounting_group = {}".format(self.inputs.accounting))
if self.online_pe:
self.extra_lines.append("+Online_CBC_PE_Daily = True")
self.requirements.append("((TARGET.Online_CBC_PE_Daily =?= True))")
if self.universe != "local" and self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(
self.executable, has_ligo_frames=True
)
self.extra_lines.extend(_osg_lines)
self.requirements.append(_osg_reqs)
self.job = pycondor.Job(
name=job_name,
executable=self.executable,
submit=self.inputs.submit_directory,
request_memory=self.request_memory,
request_disk=self.request_disk,
request_cpus=self.request_cpus,
getenv=self.getenv,
universe=self.universe,
initialdir=self.inputs.initialdir,
notification=self.notification,
requirements=" && ".join(self.requirements),
extra_lines=self.extra_lines,
dag=self.dag.pycondor_dag,
arguments=self.arguments.print(),
retry=self.retry,
verbose=self.verbose,
)
logger.debug("Adding job: {}".format(job_name))
@staticmethod
def _checkpoint_submit_lines():
return [
"+SuccessCheckpointExitCode = {}".format(CHECKPOINT_EXIT_CODE),
"+WantFTOnCheckpoint = True",
]
@staticmethod
def _condor_file_transfer_lines(inputs, outputs):
return [
"should_transfer_files = YES",
"transfer_input_files = {}".format(",".join(inputs)),
"transfer_output_files = {}".format(",".join(outputs)),
"when_to_transfer_output = ON_EXIT_OR_EVICT",
"stream_error = True",
"stream_output = True",
]
@staticmethod
def _relative_topdir(path, reference):
"""Returns the top-level directory name of a path relative
to a reference
"""
try:
return str(Path(path).resolve().relative_to(reference))
except ValueError as exc:
exc.args = ("cannot format {} relative to {}".format(path, reference),)
raise
def _osg_submit_options(self, executable, has_ligo_frames=False):
"""Returns the extra submit lines and requirements to enable running
a job on the Open Science Grid
Returns
-------
lines : list
the list of extra submit lines to include
requirements : str
the extra requirements line to include
"""
# required for OSG submission
lines = ["+OpenScienceGrid = True"]
requirements = ["(IS_GLIDEIN=?=True)"]
# if we need GWF data:
if has_ligo_frames:
requirements.append("(HAS_LIGO_FRAMES=?=True)")
# if need a /cvmfs repo for the software:
# NOTE: this should really be applied to _all_ workflows
# that need CVMFS, not just distributed ones, but
# not all local pools advertise the CVMFS repo flags
if executable.startswith("/cvmfs"):
repo = executable.split(os.path.sep, 3)[2]
requirements.append(
"(HAS_CVMFS_{}=?=True)".format(re.sub("[.-]", "_", repo))
)
return lines, " && ".join(requirements)
def _log_output_error_submit_lines(logdir, prefix):
"""Returns the filepaths for condor log, output, and error options
Parameters
----------
logdir : str
the target directory for the files
prefix : str
the prefix for the files
Returns
-------
log, output, error : list of str
the list of three file paths to be passed to pycondor.Job
Examples
--------
>>> Dag._log_output_error_submit_lines("test", "job")
['log = test/job.log',
'output = test/job.out',
'error = test/job.err']
"""
logpath = Path(logdir)
filename = "{}.{{}}".format(prefix)
return [
"{} = {}".format(opt, str(logpath / filename.format(opt[:3])))
for opt in ("log", "output", "error")
]
from .analysis_node import AnalysisNode
from .generation_node import GenerationNode
from .merge_node import MergeNode
from .pe_summary_node import PESummaryNode
from .plot_node import PlotNode
from .post_processing_node import (
PostProcessAllResultsNode,
PostProcessSingleResultsNode,
)
import os
from ..node import Node
class AnalysisNode(Node):
def __init__(self, inputs, generation_node, detectors, sampler, parallel_idx, dag):
super().__init__(inputs)
self.dag = dag
self.generation_node = generation_node
self.detectors = detectors
self.parallel_idx = parallel_idx
self.request_cpus = inputs.request_cpus
data_label = generation_node.job_name
base_name = data_label.replace("generation", "analysis")
self.base_job_name = "{}_{}_{}".format(base_name, "".join(detectors), sampler)
if parallel_idx != "":
self.job_name = "{}_{}".format(self.base_job_name, parallel_idx)
else:
self.job_name = self.base_job_name
self.label = self.job_name
self.setup_arguments()
if self.inputs.transfer_files or self.inputs.osg:
data_dump_file = generation_node.data_dump_file
input_files_to_transfer = [
str(data_dump_file),
str(self.inputs.complete_ini_file),
]
self.extra_lines.extend(
self._condor_file_transfer_lines(
input_files_to_transfer,
[self._relative_topdir(self.inputs.outdir, self.inputs.initialdir)],
)
)
self.arguments.add("outdir", os.path.basename(self.inputs.outdir))
for det in detectors:
self.arguments.add("detectors", det)
self.arguments.add("label", self.label)
self.arguments.add("data-dump-file", generation_node.data_dump_file)
self.arguments.add("sampler", sampler)
self.extra_lines.extend(self._checkpoint_submit_lines())
self.process_node()
self.job.add_parent(generation_node.job)
@property
def executable(self):
return self._get_executable_path("bilby_pipe_analysis")
@property
def request_memory(self):
return self.inputs.request_memory
@property
def log_directory(self):
return self.inputs.data_analysis_log_directory
@property
def result_file(self):
return "{}/{}_result.json".format(self.inputs.result_directory, self.job_name)
from ...utils import DataDump, logger
from ..node import Node
class GenerationNode(Node):
def __init__(self, inputs, trigger_time, idx, dag):
super().__init__(inputs)
self.inputs = inputs
self.trigger_time = trigger_time
self.idx = idx
self.dag = dag
self.request_cpus = 1
self.setup_arguments()
self.arguments.add("label", self.label)
self.arguments.add("idx", self.idx)
self.arguments.add("trigger-time", self.trigger_time)
if self.inputs.injection_file is not None:
self.arguments.add("injection-file", self.inputs.injection_file)
if self.inputs.timeslide_file is not None:
self.arguments.add("timeslide-file", self.inputs.timeslide_file)
self.process_node()
@property
def executable(self):
return self._get_executable_path("bilby_pipe_generation")
@property
def request_memory(self):
return self.inputs.request_memory_generation
@property
def log_directory(self):
return self.inputs.data_generation_log_directory
@property
def universe(self):
if self.inputs.local_generation:
logger.debug(
"Data generation done locally: please do not use this when "
"submitting a large number of jobs"
)
universe = "local"
else:
logger.debug(
"All data will be grabbed in the {} universe".format(self._universe)
)
universe = self._universe
return universe
@property
def job_name(self):
job_name = "{}_data{}_{}_generation".format(
self.inputs.label, str(self.idx), self.trigger_time
)
job_name = job_name.replace(".", "-")
return job_name
@property
def label(self):
return self.job_name
@property
def data_dump_file(self):
return DataDump.get_filename(self.inputs.data_directory, self.label)
from ..node import Node
class MergeNode(Node):
def __init__(self, inputs, parallel_node_list, detectors, dag):
super().__init__(inputs)
self.dag = dag
self.job_name = "{}_merge".format(parallel_node_list[0].base_job_name)
self.label = "{}_merge".format(parallel_node_list[0].base_job_name)
self.request_cpus = 1
self.setup_arguments(
add_ini=False, add_unknown_args=False, add_command_line_args=False
)
self.arguments.append("--result")
for pn in parallel_node_list:
self.arguments.append(pn.result_file)
self.arguments.add("outdir", self.inputs.result_directory)
self.arguments.add("label", self.label)
self.arguments.add_flag("merge")
self.process_node()
for pn in parallel_node_list:
self.job.add_parent(pn.job)
@property
def executable(self):
return self._get_executable_path("bilby_result")
@property
def request_memory(self):
return "16 GB"
@property
<