Skip to content
Snippets Groups Projects
Commit 643a1359 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

add submit_description argument to Layer, replacing requirements

keep requirements for backwards compatibility with a deprecation warning

also refactor submit description generation for clarity
parent 0c071783
No related branches found
No related tags found
No related merge requests found
......@@ -6,6 +6,7 @@ import itertools
import os
import shutil
from typing import Any, Dict, List, Union
import warnings
import htcondor
......@@ -38,8 +39,13 @@ class Layer:
Whether to dynamically increase memory request if jobs are
put on hold due to going over memory requested.
Off by default.
submit_description
The submit descriptors representing this set of jobs.
requirements
Additional key-value pairs in the submit description.
The submit descriptors representing this set of jobs.
Deprecated in favor for submit_description to avoid confusion,
as 'requirements' refers to a specific submit descriptor.
This option will be removed in a future release.
nodes
The nodes representing the layer. Nodes can be passed upon
instantiation or added to the layer after the fact via
......@@ -55,6 +61,7 @@ class Layer:
transfer_files: bool = True
dynamic_memory: bool = False
requirements: dict = field(default_factory=dict)
submit_description: Union[dict, htcondor.Submit] = field(default_factory=dict)
nodes: list = field(default_factory=list)
inputs: dict = field(init=False, default_factory=dict)
outputs: dict = field(init=False, default_factory=dict)
......@@ -62,68 +69,25 @@ class Layer:
def __post_init__(self) -> None:
if not self.name:
self.name = os.path.basename(self.executable)
if self.requirements:
self.submit_description.update(self.requirements)
warnings.warn(
"requirements has been deprecated in favor of submit_description"
"to avoid confusion and will be removed in a future release",
DeprecationWarning,
)
self.extend(self.nodes)
def config(self) -> Dict[str, Any]:
# check that nodes are valid
self.validate()
# add base submit opts + requirements
submit_options: Dict[str, Any] = {
"universe": self.universe,
"executable": shutil.which(self.executable),
"arguments": self._arguments(),
"periodic_release": "(HoldReasonCode == 5)",
**self.requirements,
}
# file submit opts
if self.transfer_files:
inputs = self._inputs()
outputs = self._outputs()
output_remaps = self._output_remaps()
if inputs or outputs:
submit_options["should_transfer_files"] = "YES"
submit_options["when_to_transfer_output"] = "ON_SUCCESS"
submit_options["success_exit_code"] = 0
submit_options["preserve_relative_paths"] = True
if inputs:
submit_options["transfer_input_files"] = inputs
if outputs:
submit_options["transfer_output_files"] = outputs
submit_options["transfer_output_remaps"] = f'"{output_remaps}"'
# log submit opts
submit_options[
"output"
] = f"{self.log_dir}/$(nodename)-$(cluster)-$(process).out"
submit_options[
"error"
] = f"{self.log_dir}/$(nodename)-$(cluster)-$(process).err"
# extra boilerplate submit opts
submit_options["notification"] = "never"
# set dynamic memory opts if requested
if self.dynamic_memory:
base_memory = submit_options["request_memory"]
submit_options["+MemoryUsage"] = f"( {base_memory} )"
submit_options["request_memory"] = "( MemoryUsage ) * 3 / 2"
hold_condition = (
"((CurrentTime - EnteredCurrentStatus > 180)"
" && (HoldReasonCode != 34))"
)
submit_options["periodic_release"] = " || ".join(
[
submit_options["periodic_release"],
hold_condition,
]
)
# update submit description with defaults + other layer configuration
submit_description = self._update_submit_defaults(self.submit_description)
return {
"name": self.name,
"submit_description": htcondor.Submit(submit_options),
"submit_description": submit_description,
"vars": self._vars(),
"retries": self.retries,
}
......@@ -267,6 +231,60 @@ class Layer:
return allvars
def _update_submit_defaults(
self, submit_description: Union[dict, htcondor.Submit]
) -> htcondor.Submit:
# add base submit opts + additional submit descriptors
submit: Dict[str, Any] = {
"universe": self.universe,
"executable": shutil.which(self.executable),
"arguments": self._arguments(),
"periodic_release": "(HoldReasonCode == 5)",
**submit_description,
}
# file submit opts
if self.transfer_files:
inputs = self._inputs()
outputs = self._outputs()
output_remaps = self._output_remaps()
if inputs or outputs:
submit["should_transfer_files"] = "YES"
submit["when_to_transfer_output"] = "ON_SUCCESS"
submit["success_exit_code"] = 0
submit["preserve_relative_paths"] = True
if inputs:
submit["transfer_input_files"] = inputs
if outputs:
submit["transfer_output_files"] = outputs
submit["transfer_output_remaps"] = f'"{output_remaps}"'
# log submit opts
submit["output"] = f"{self.log_dir}/$(nodename)-$(cluster)-$(process).out"
submit["error"] = f"{self.log_dir}/$(nodename)-$(cluster)-$(process).err"
# extra boilerplate submit opts
submit["notification"] = "never"
# set dynamic memory opts if requested
if self.dynamic_memory:
base_memory = submit["request_memory"]
submit["+MemoryUsage"] = f"( {base_memory} )"
submit["request_memory"] = "( MemoryUsage ) * 3 / 2"
hold_condition = (
"((CurrentTime - EnteredCurrentStatus > 180)"
" && (HoldReasonCode != 34))"
)
submit["periodic_release"] = " || ".join(
[
submit["periodic_release"],
hold_condition,
]
)
return htcondor.Submit(submit)
@dataclass
class Node:
......
......@@ -13,7 +13,7 @@ def test_dag_generation(mock_which, shared_datadir, tmp_path):
requirements = {"request_cpus": 1, "request_memory": 2000}
# create processing layer, add nodes
process_layer = Layer("process_bins", requirements=requirements)
process_layer = Layer("process_bins", submit_description=requirements)
output_files = []
for i in range(3):
output_file = f"output_{i}.txt"
......@@ -32,7 +32,7 @@ def test_dag_generation(mock_which, shared_datadir, tmp_path):
dag.attach(process_layer)
# create combine layer, add node
combine_layer = Layer("combine_bins", requirements=requirements)
combine_layer = Layer("combine_bins", submit_description=requirements)
combine_layer += Node(
arguments=Option("verbose"),
inputs=Argument("input", output_files),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment