Skip to content
Snippets Groups Projects

Added support for submitting jobs to the Open Science Grid

Merged Duncan Macleod requested to merge duncanmmacleod/bilby_pipe:osg into master
1 file
+ 73
52
Compare changes
  • Side-by-side
  • Inline
+ 73
52
@@ -10,6 +10,7 @@ import os
import shutil
import sys
import subprocess
from pathlib import Path
import pycondor
@@ -472,15 +473,12 @@ class Dag(object):
if job_input.meta_label is not None:
job_name = "_".join([job_name, job_input.meta_label])
job_name = job_name.replace(".", "-")
job_logs_base = os.path.join(
self.inputs.data_generation_log_directory, job_name
)
submit = self.inputs.submit_directory
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
))
extra_lines.extend(self._log_output_error_submit_lines(
self.inputs.data_generation_log_directory,
job_name,
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting)
)
@@ -592,20 +590,19 @@ class Dag(object):
job_name = "_".join([job_name, job_input.meta_label])
job_name = job_name.replace(".", "-")
job_name += "_{}".format(run_id)
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
submit = self.inputs.submit_directory
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
))
extra_lines.extend((
"accounting_group = {}".format(self.inputs.accounting),
"+WantCheckpointSignal = True",
"+WantFTOnCheckpoint = True",
"+SuccessCheckpointExitCode = 130",
))
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory,
job_name,
) + [
"accounting_group = {}".format(self.inputs.accounting),
"+WantCheckpointSignal = True",
"+WantFTOnCheckpoint = True",
"+SuccessCheckpointExitCode = 130",
],
)
if self.inputs.transfer_files:
extra_lines.extend((
@@ -686,15 +683,14 @@ class Dag(object):
job_name = "{}_postprocessing".format(self.inputs.label)
submit = self.inputs.submit_directory
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting)
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory,
job_name,
) + [
"accounting_group = {}".format(self.inputs.accounting)
],
)
exe = shutil.which(self.inputs.postprocessing_executable)
@@ -742,15 +738,14 @@ class Dag(object):
return
job_name = "_".join([self.inputs.label, "merge_runs"])
submit = self.inputs.submit_directory
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting)
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory,
job_name,
) + [
"accounting_group = {}".format(self.inputs.accounting)
],
)
exe = shutil.which("bilby_result")
@@ -797,18 +792,15 @@ class Dag(object):
for file, parent_job in zip(files, parent_jobs):
job_name = parent_job.name + "_plot"
job_logs_base = os.path.join(
self.inputs.data_analysis_log_directory, job_name
)
extra_lines = list(self.extra_lines)
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting),
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.data_analysis_log_directory,
job_name,
) + [
"accounting_group = {}".format(self.inputs.accounting),
],
)
for arg in ["error", "log", "output"]:
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
))
arguments = ArgumentsString()
arguments.add_positional_argument(self.inputs.ini)
@@ -849,15 +841,15 @@ class Dag(object):
existing_dir = self.inputs.existing_dir
job_name = "_".join([self.inputs.label, "results_page"])
job_name = job_name.replace(".", "-")
job_logs_base = os.path.join(self.inputs.summary_log_directory, job_name)
submit = self.inputs.submit_directory
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting),
extra_lines.extend(
self._log_output_error_submit_lines(
self.inputs.summary_log_directory,
job_name,
) + [
"accounting_group = {}".format(self.inputs.accounting),
],
)
arguments = ArgumentsString()
arguments.add("webdir", webdir)
@@ -920,6 +912,35 @@ class Dag(object):
)
)
@staticmethod
def _log_output_error_submit_lines(logdir, prefix):
"""Returns the filepaths for condor log, output, and error options
Parameters
----------
logdir : str
the target directory for the files
prefix : str
the prefix for the files
Returns
-------
log, output, error : list of str
the list of three file paths to be passed to pycondor.Job
Examples
--------
>>> Dag._log_output_error_submit_lines("test", "job")
['log = test/job_$(Cluster)_$(Process).log',
'output = test/job_$(Cluster)_$(Process).out',
'error = test/job_$(Cluster)_$(Process).err']
"""
logpath = Path(logdir)
filename = "{}_$(Cluster)_$(Process).{{}}".format(prefix)
return ["{} = {}".format(
opt,
str(logpath / filename.format(opt[:3])),
) for opt in ("log", "output", "error")]
def main():
""" Top-level interface for bilby_pipe """
Loading