Skip to content
Snippets Groups Projects

Added support for submitting jobs to the Open Science Grid

Merged Duncan Macleod requested to merge duncanmmacleod/bilby_pipe:osg into master
1 file
+ 55
37
Compare changes
  • Side-by-side
  • Inline
+ 55
37
@@ -319,6 +319,7 @@ class Dag(object):
requirements=None,
retry=None,
verbose=0,
extra_lines=None,
):
self.request_disk = request_disk
self.request_cpus = inputs.request_cpus
@@ -330,6 +331,7 @@ class Dag(object):
self.retry = retry
self.verbose = verbose
self.inputs = inputs
self.extra_lines = list(extra_lines or [])
if self.inputs.n_level_A_jobs == 0:
raise BilbyPipeError("ini file contained no data-generation requirement")
@@ -474,12 +476,14 @@ class Dag(object):
self.inputs.data_generation_log_directory, job_name
)
submit = self.inputs.submit_directory
extra_lines = ""
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting)
)
arguments = ArgumentsString()
if self.inputs.use_singularity:
@@ -590,23 +594,26 @@ class Dag(object):
job_name += "_{}".format(run_id)
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
submit = self.inputs.submit_directory
extra_lines = ""
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
))
extra_lines += "\n+WantCheckpointSignal = True"
extra_lines += "\n+WantFTOnCheckpoint = True"
extra_lines += "\n+SuccessCheckpointExitCode = 130"
extra_lines.extend((
"accounting_group = {}".format(self.inputs.accounting),
"+WantCheckpointSignal = True",
"+WantFTOnCheckpoint = True",
"+SuccessCheckpointExitCode = 130",
))
if self.inputs.transfer_files:
extra_lines += "\nshould_transfer_files = YES"
extra_lines += "\ntransfer_output_files = {}".format(
self.inputs.result_directory
)
extra_lines.extend((
"should_transfer_files = YES",
"transfer_output_files = {}".format(
self.inputs.result_directory
),
))
data_dump_file = DataDump.get_filename(
self.inputs.data_directory, self.generation_job_labels[idx], idx
)
@@ -618,11 +625,14 @@ class Dag(object):
distance_marg_cache_file = ".distance_marginalization_lookup.npz"
if os.path.isfile(distance_marg_cache_file):
input_files_to_transfer.append(distance_marg_cache_file)
extra_lines += "\ntransfer_input_files = {}".format(
",".join(input_files_to_transfer)
)
extra_lines += "\nwhen_to_transfer_output = ON_EXIT_OR_EVICT"
extra_lines += "\nstream_error = True\nstream_output = True"
extra_lines.extend((
"transfer_input_files = {}".format(
",".join(input_files_to_transfer)
),
"when_to_transfer_output = ON_EXIT_OR_EVICT",
"stream_error = True",
"stream_output = True",
))
arguments = ArgumentsString()
if self.inputs.use_singularity:
@@ -678,12 +688,14 @@ class Dag(object):
submit = self.inputs.submit_directory
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
extra_lines = ""
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting)
)
exe = shutil.which(self.inputs.postprocessing_executable)
@@ -732,12 +744,14 @@ class Dag(object):
submit = self.inputs.submit_directory
job_logs_base = os.path.join(self.inputs.data_analysis_log_directory, job_name)
extra_lines = ""
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting)
)
exe = shutil.which("bilby_result")
arguments = "-r {} --merge --outdir {} --label {}".format(
@@ -787,12 +801,14 @@ class Dag(object):
self.inputs.data_analysis_log_directory, job_name
)
extra_lines = ""
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
extra_lines = list(self.extra_lines)
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting),
)
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
)
))
arguments = ArgumentsString()
arguments.add_positional_argument(self.inputs.ini)
@@ -835,12 +851,14 @@ class Dag(object):
job_name = job_name.replace(".", "-")
job_logs_base = os.path.join(self.inputs.summary_log_directory, job_name)
submit = self.inputs.submit_directory
extra_lines = ""
extra_lines = list(self.extra_lines)
for arg in ["error", "log", "output"]:
extra_lines += "\n{} = {}_$(Cluster)_$(Process).{}".format(
extra_lines.append("{} = {}_$(Cluster)_$(Process).{}".format(
arg, job_logs_base, arg[:3]
)
extra_lines += "\naccounting_group = {}".format(self.inputs.accounting)
))
extra_lines.append(
"accounting_group = {}".format(self.inputs.accounting),
)
arguments = ArgumentsString()
arguments.add("webdir", webdir)
arguments.add("email", email)
Loading