Skip to content
Snippets Groups Projects

Added support for submitting jobs to the Open Science Grid

Merged Duncan Macleod requested to merge duncanmmacleod/bilby_pipe:osg into master
Files
3
+ 191
61
@@ -10,6 +10,7 @@ import os
import shutil
import sys
import subprocess
from pathlib import Path
import pycondor
@@ -80,6 +81,7 @@ class MainInput(Input):
self.coherence_test = args.coherence_test
self.n_parallel = args.n_parallel
self.transfer_files = args.transfer_files
self.osg = args.osg
self.waveform_approximant = args.waveform_approximant
self.likelihood_type = args.likelihood_type
@@ -319,6 +321,7 @@ class Dag(object):
requirements=None,
retry=None,
verbose=0,
extra_lines=None,
):
self.request_disk = request_disk
self.request_cpus = inputs.request_cpus
@@ -330,6 +333,7 @@ class Dag(object):
self.retry = retry
self.verbose = verbose
self.inputs = inputs
self.extra_lines = list(extra_lines or [])
if self.inputs.n_level_A_jobs == 0:
raise BilbyPipeError("ini file contained no data-generation requirement")
@@ -470,16 +474,23 @@ class Dag(object):
if job_input.meta_label is not None:
job_name = "_".join([job_name, job_input.meta_label])
job_name = job_name.replace(".", "-")
job_logs_base = os.path.join(
self.inputs.data_generation_log_directory, job_name
)
submit = self.inputs.submit_directory
extra_lines = ""
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_generation_log_directory, job_name
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
)
extra_lines.append("accounting_group = {}".format(self.inputs.accounting))
if universe != "local" and self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(
self.generation_executable, has_ligo_frames=True
)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
arguments = ArgumentsString()
if self.inputs.use_singularity:
@@ -507,7 +518,7 @@ class Dag(object):
universe=universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
extra_lines=extra_lines,
dag=self.dag,
arguments=arguments.print(),
@@ -588,25 +599,18 @@ class Dag(object):
job_name = "_".join([job_name, job_input.meta_label])
job_name = job_name.replace(".", "-")
job_name += "_{}".format(run_id)
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
submit = self.inputs.submit_directory
extra_lines = ""
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
requirements = [self.requirements] if self.requirements else []
extra_lines = list(self.extra_lines)
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory, job_name
)
+ self._checkpoint_submit_lines()
+ ["accounting_group = {}".format(self.inputs.accounting)]
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
extra_lines += "\n+WantCheckpointSignal = True"
extra_lines += "\n+WantFTOnCheckpoint = True"
extra_lines += "\n+SuccessCheckpointExitCode = 130"
if self.inputs.transfer_files:
extra_lines += "\nshould_transfer_files = YES"
extra_lines += "\ntransfer_output_files = {}".format(
self.inputs.result_directory
)
if self.inputs.transfer_files or self.inputs.osg:
data_dump_file = DataDump.get_filename(
self.inputs.data_directory, self.generation_job_labels[idx], idx
)
@@ -618,11 +622,17 @@ class Dag(object):
distance_marg_cache_file = ".distance_marginalization_lookup.npz"
if os.path.isfile(distance_marg_cache_file):
input_files_to_transfer.append(distance_marg_cache_file)
extra_lines += "\ntransfer_input_files = {}".format(
",".join(input_files_to_transfer)
extra_lines.extend(
self._condor_file_transfer_lines(
input_files_to_transfer,
[self._relative_topdir(self.inputs.outdir, self.initialdir)],
)
)
extra_lines += "\nwhen_to_transfer_output = ON_EXIT_OR_EVICT"
extra_lines += "\nstream_error = True\nstream_output = True"
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(self.analysis_executable)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
arguments = ArgumentsString()
if self.inputs.use_singularity:
@@ -652,7 +662,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
extra_lines=extra_lines,
dag=self.dag,
arguments=arguments.print(),
@@ -676,17 +686,22 @@ class Dag(object):
job_name = "{}_postprocessing".format(self.inputs.label)
submit = self.inputs.submit_directory
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
extra_lines = ""
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory, job_name
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
+ ["accounting_group = {}".format(self.inputs.accounting)]
)
exe = shutil.which(self.inputs.postprocessing_executable)
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(exe)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
job = pycondor.Job(
name=job_name,
executable=exe,
@@ -695,7 +710,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
dag=self.dag,
extra_lines=extra_lines,
arguments=self.inputs.postprocessing_arguments,
@@ -730,14 +745,15 @@ class Dag(object):
return
job_name = "_".join([self.inputs.label, "merge_runs"])
submit = self.inputs.submit_directory
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines = ""
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory, job_name
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
+ ["accounting_group = {}".format(self.inputs.accounting)]
)
exe = shutil.which("bilby_result")
arguments = "-r {} --merge --outdir {} --label {}".format(
@@ -746,6 +762,11 @@ class Dag(object):
self.merged_runs_label,
)
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(exe)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
job = pycondor.Job(
name=job_name,
executable=exe,
@@ -754,7 +775,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
dag=self.dag,
extra_lines=extra_lines,
arguments=arguments,
@@ -783,16 +804,13 @@ class Dag(object):
for file, parent_job in zip(files, parent_jobs):
job_name = parent_job.name + "_plot"
job_logs_base = os.path.join(
self.inputs.data_analysis_log_directory, job_name
)
extra_lines = ""
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
extra_lines = list(self.extra_lines)
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory, job_name
)
+ ["accounting_group = {}".format(self.inputs.accounting)]
)
arguments = ArgumentsString()
arguments.add_positional_argument(self.inputs.ini)
@@ -833,14 +851,32 @@ class Dag(object):
existing_dir = self.inputs.existing_dir
job_name = "_".join([self.inputs.label, "results_page"])
job_name = job_name.replace(".", "-")
job_logs_base = os.path.join(self.inputs.summary_log_directory, job_name)
submit = self.inputs.submit_directory
extra_lines = ""
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.summary_log_directory, job_name
)
+ ["accounting_group = {}".format(self.inputs.accounting)]
)
if self.inputs.transfer_files or self.inputs.osg:
extra_lines.extend(
self._condor_file_transfer_lines(
[str(self.inputs.ini)] + files,
[self._relative_topdir(self.inputs.outdir, self.initialdir)],
)
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
# condor transfers all files into a flat structure
files = list(map(os.path.basename, files))
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(self.summary_executable)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
arguments = ArgumentsString()
arguments.add("webdir", webdir)
arguments.add("email", email)
@@ -866,7 +902,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
extra_lines=extra_lines,
dag=self.dag,
arguments=arguments.print(),
@@ -902,6 +938,100 @@ class Dag(object):
)
)
@staticmethod
def _log_output_error_submit_lines(logdir, prefix):
"""Returns the filepaths for condor log, output, and error options
Parameters
----------
logdir : str
the target directory for the files
prefix : str
the prefix for the files
Returns
-------
log, output, error : list of str
the list of three file paths to be passed to pycondor.Job
Examples
--------
>>> Dag._log_output_error_submit_lines("test", "job")
['log = test/job_$(Cluster)_$(Process).log',
'output = test/job_$(Cluster)_$(Process).out',
'error = test/job_$(Cluster)_$(Process).err']
"""
logpath = Path(logdir)
filename = "{}_$(Cluster)_$(Process).{{}}".format(prefix)
return [
"{} = {}".format(opt, str(logpath / filename.format(opt[:3])))
for opt in ("log", "output", "error")
]
@staticmethod
def _checkpoint_submit_lines(code=130):
return [
# needed for htcondor < 8.9.x (probably)
"+CheckpointExitBySignal = False",
"+CheckpointExitCode = {}".format(code),
# htcondor >= 8.9.x (probably)
"+SuccessCheckpointExitBySignal = False",
"+SuccessCheckpointExitCode = {}".format(code),
# ask condor to provide the checkpoint signals
"+WantCheckpointSignal = True",
'+CheckpointSig = "SIGTERM"',
]
@staticmethod
def _condor_file_transfer_lines(inputs, outputs):
return [
"should_transfer_files = YES",
"transfer_input_files = {}".format(",".join(inputs)),
"transfer_output_files = {}".format(",".join(outputs)),
"when_to_transfer_output = ON_EXIT_OR_EVICT",
"stream_error = True",
"stream_output = True",
]
@staticmethod
def _relative_topdir(path, reference):
"""Returns the top-level directory name of a path relative
to a reference
"""
try:
return str(Path(path).resolve().relative_to(reference))
except ValueError as exc:
exc.args = ("cannot format {} relative to {}".format(path, reference),)
raise
def _osg_submit_options(self, executable, has_ligo_frames=False):
"""Returns the extra submit lines and requirements to enable running
a job on the Open Science Grid
Returns
-------
lines : list
the list of extra submit lines to include
requirements : str
the extra requirements line to include
"""
# required for OSG submission
lines = ["+OpenScienceGrid = True"]
requirements = ["(IS_GLIDEIN=?=True)"]
# if we need GWF data:
if has_ligo_frames:
requirements.append("(HAS_LIGO_FRAMES=?=True)")
# if we need singularity:
if self.inputs.use_singularity:
requirements.append("(HAS_SINGULARITY=?=True)")
# otherwise if need the ligo-containers /cvmfs repo:
elif executable.startswith("/cvmfs/ligo-containers.opensciencegrid.org"):
requirements.append("(HAS_CVMFS_LIGO_CONTAINERS=?=True)")
return lines, " && ".join(requirements)
def main():
""" Top-level interface for bilby_pipe """
Loading