Skip to content
Snippets Groups Projects

Added support for submitting jobs to the Open Science Grid

Merged Duncan Macleod requested to merge duncanmmacleod/bilby_pipe:osg into master
2 files
+ 124
24
Compare changes
  • Side-by-side
  • Inline
Files
2
  • 9a933c1c
    bilby_pipe: support running on OSG · 9a933c1c
    Duncan Macleod authored
    This commit is a little larger than planned, but does the following:
    
    - abstracts checkpointing options to a method, and updates them to the
      minimal working set
    - abstracts condor file transfer options to a method, and implements for
      pesummary jobs
    - adds OSG running options to jobs and the `--osg` command-line option
      for `bilby_pipe`
    
    The above necessitated a bit of restructuring in some methods to support adding extra `requirements` for jobs
+ 118
24
@@ -81,6 +81,7 @@ class MainInput(Input):
self.coherence_test = args.coherence_test
self.n_parallel = args.n_parallel
self.transfer_files = args.transfer_files
self.osg = args.osg
self.waveform_approximant = args.waveform_approximant
self.likelihood_type = args.likelihood_type
@@ -475,6 +476,8 @@ class Dag(object):
job_name = job_name.replace(".", "-")
submit = self.inputs.submit_directory
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines.extend(self._log_output_error_submit_lines(
self.inputs.data_generation_log_directory,
job_name,
@@ -483,6 +486,14 @@ class Dag(object):
"accounting_group = {}".format(self.inputs.accounting)
)
if universe != "local" and self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(
self.generation_executable,
has_ligo_frames=True,
)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
arguments = ArgumentsString()
if self.inputs.use_singularity:
arguments.append(
@@ -509,7 +520,7 @@ class Dag(object):
universe=universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
extra_lines=extra_lines,
dag=self.dag,
arguments=arguments.print(),
@@ -591,26 +602,20 @@ class Dag(object):
job_name = job_name.replace(".", "-")
job_name += "_{}".format(run_id)
submit = self.inputs.submit_directory
requirements = [self.requirements] if self.requirements else []
extra_lines = list(self.extra_lines)
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory,
job_name,
) + [
) +
self._checkpoint_submit_lines() +
[
"accounting_group = {}".format(self.inputs.accounting),
"+WantCheckpointSignal = True",
"+WantFTOnCheckpoint = True",
"+SuccessCheckpointExitCode = 130",
],
]
)
if self.inputs.transfer_files:
extra_lines.extend((
"should_transfer_files = YES",
"transfer_output_files = {}".format(
self.inputs.result_directory
),
))
if self.inputs.transfer_files or self.inputs.osg:
data_dump_file = DataDump.get_filename(
self.inputs.data_directory, self.generation_job_labels[idx], idx
)
@@ -622,15 +627,18 @@ class Dag(object):
distance_marg_cache_file = ".distance_marginalization_lookup.npz"
if os.path.isfile(distance_marg_cache_file):
input_files_to_transfer.append(distance_marg_cache_file)
extra_lines.extend((
"transfer_input_files = {}".format(
",".join(input_files_to_transfer)
),
"when_to_transfer_output = ON_EXIT_OR_EVICT",
"stream_error = True",
"stream_output = True",
extra_lines.extend(self._condor_file_transfer_lines(
input_files_to_transfer,
[self.inputs.result_directory],
))
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(
self.analysis_executable,
)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
arguments = ArgumentsString()
if self.inputs.use_singularity:
arguments.append(
@@ -659,7 +667,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
extra_lines=extra_lines,
dag=self.dag,
arguments=arguments.print(),
@@ -684,6 +692,7 @@ class Dag(object):
job_name = "{}_postprocessing".format(self.inputs.label)
submit = self.inputs.submit_directory
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory,
@@ -695,6 +704,11 @@ class Dag(object):
exe = shutil.which(self.inputs.postprocessing_executable)
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(exe)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
job = pycondor.Job(
name=job_name,
executable=exe,
@@ -703,7 +717,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
dag=self.dag,
extra_lines=extra_lines,
arguments=self.inputs.postprocessing_arguments,
@@ -739,6 +753,8 @@ class Dag(object):
job_name = "_".join([self.inputs.label, "merge_runs"])
submit = self.inputs.submit_directory
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory,
@@ -755,6 +771,11 @@ class Dag(object):
self.merged_runs_label,
)
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(exe)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
job = pycondor.Job(
name=job_name,
executable=exe,
@@ -763,7 +784,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
dag=self.dag,
extra_lines=extra_lines,
arguments=arguments,
@@ -843,6 +864,8 @@ class Dag(object):
job_name = job_name.replace(".", "-")
submit = self.inputs.submit_directory
extra_lines = list(self.extra_lines)
requirements = [self.requirements] if self.requirements else []
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.summary_log_directory,
@@ -851,6 +874,20 @@ class Dag(object):
"accounting_group = {}".format(self.inputs.accounting),
],
)
if self.inputs.transfer_files or self.inputs.osg:
extra_lines.extend(self._condor_file_transfer_lines(
[str(self.inputs.ini)] + files,
[webdir],
))
if self.inputs.osg:
_osg_lines, _osg_reqs = self._osg_submit_options(
self.summary_executable,
)
extra_lines.extend(_osg_lines)
requirements.append(_osg_reqs)
arguments = ArgumentsString()
arguments.add("webdir", webdir)
arguments.add("email", email)
@@ -876,7 +913,7 @@ class Dag(object):
universe=self.universe,
initialdir=self.initialdir,
notification=self.notification,
requirements=self.requirements,
requirements=" && ".join(requirements),
extra_lines=extra_lines,
dag=self.dag,
arguments=arguments.print(),
@@ -942,6 +979,63 @@ class Dag(object):
str(logpath / filename.format(opt[:3])),
) for opt in ("log", "output", "error")]
@staticmethod
def _checkpoint_submit_lines(code=130):
return [
# needed for htcondor < 8.9.x (probably)
"+CheckpointExitBySignal = False",
"+CheckpointExitCode = {}".format(code),
# htcondor >= 8.9.x (probably)
"+SuccessCheckpointExitBySignal = False",
"+SuccessCheckpointExitCode = {}".format(code),
]
@staticmethod
def _condor_file_transfer_lines(inputs, outputs):
return [
"should_transfer_files = YES",
"transfer_input_files = {}".format(",".join(inputs)),
"transfer_output_files = {}".format(",".join(outputs)),
"when_to_transfer_output = ON_EXIT_OR_EVICT",
"stream_error = True",
"stream_output = True",
]
def _osg_submit_options(self, executable, has_ligo_frames=False):
"""Returns the extra submit lines and requirements to enable running
a job on the Open Science Grid
Returns
-------
lines : list
the list of extra submit lines to include
requirements : str
the extra requirements line to include
"""
# required for OSG submission
lines = [
"+OpenScienceGrid = True",
]
requirements = [
"(IS_GLIDEIN=?=True)",
]
# if we need GWF data:
if has_ligo_frames:
requirements.append("(HAS_LIGO_FRAMES=?=True)")
# if we need singularity:
if self.inputs.use_singularity:
requirements.append("(HAS_SINGULARITY=?=True)")
# otherwise if need the ligo-containers /cvmfs repo:
elif executable.startswith(
"/cvmfs/ligo-containers.opensciencegrid.org"
):
requirements.append("(HAS_CVMFS_LIGO_CONTAINERS=?=True)")
return lines, " && ".join(requirements)
def main():
""" Top-level interface for bilby_pipe """
parser = create_parser(top_level=True)
Loading