Dynamic App Control
Motivation
This MR adds dynamic control over pipeline behavior to sgn
. Dynamic control is implemented with the following design principles:
-
Everything is in the graph. No side-effect based message passing, indirect callables, etc. All controls are directly detectable in the graph structure of the
Pipeline
object. -
Backwards Compatible. Controllability is optional and has to be explicitly chosen. All
sgn
base classes are unchanged. - Extensible App Support. A sufficiently general api is exposed to allow for custom applications behavior regarding controls. Signal handler support is included, but an arbitrary python process is all that is required.
Changes
This MR touches most of sgn
, though doesn't change any existing default behavior!
Additions
This MR adds several thematic groups of features:
New controls
module for base classes that establish behavior for sending and receiving special frames related to controls:
-
Control
: a callable that is applied to an element to manipulate the state of that element -
ControlSourcePad
: ASourcePad
subclass, used to distinguish types of pads -
ControlSinkPad
: ASinkPad
subclass, used to distinguish types of pads -
ControllableElement
: important mixin class for element classes likeCollectSink
that allow for explicit inclusion of controllability -
ControlSourceElement
: ASourceElement
subclass defining how control sources should behave -
QueueControlSource
: Amultiprocessing.Queue
based subclass ofControlSourceElement
useful out of the box
Helpful interface for running an arbitrary pipeline with controls:
-
apps.run_controlled
: helper function for running a Pipeline with controllability -
SignalHandlerManager
: a context manager for setting/unsetting signal handlers (needed for Keyboard Interrupt interception) -
SetEOSSignalHandler
: specific signal handler that sets theforce_eos
attribute toTrue
onSinkElement
Changes
Several small changes related to test coverage
Examples
Direct control via multiprocessing.Queue
This example shows direct control of a pipeline by submitting a Control
object to the input queue of a QueueControlSource
element
def _sample_controlled_pipeline(in_queue: Queue, out_queue: Queue):
"""Sample controlled pipeline."""
# Create the pipeline
p = Pipeline()
# Create source element and EOS control
src1 = IterSource(
name="src1",
source_pad_names=["O1"],
iter_map={"src1:src:O1": RandomIntIter(stop=5, seed=42, wait=1)},
)
ctrl = QueueControlSource(name="ctrl-eos", source_pad_names=["O1"], queue=in_queue)
# Create sink element
snk1 = CollectSink(
name="snk1",
sink_pad_names=["I1"],
collection_factory=lambda x: x,
collects={"snk1:sink:I1": out_queue},
)
# Insert elements into the pipeline
p.insert(src1, ctrl, snk1)
# Link the elements
p.link({"snk1:sink:I1": "src1:src:O1", "snk1:sink:ctrl": "ctrl-eos:src:O1"})
# Run the pipeline
p.run()
# Create process and queue
in_queue = Queue()
out_queue = Queue()
process = multiprocessing.Process(
target=_sample_controlled_pipeline, args=(in_queue, out_queue)
)
# Check input queue is empty
assert in_queue.empty()
# Start the process
process.start()
# Send an EOS control
in_queue.put(Control(name="EOS", call=controls.set_eos))
# Join the process
process.join()
# Check that the process is done
assert not process.is_alive()
# Check that the pipeline ran
assert not out_queue.empty()
# Check that the pipeline DID NOT run all the way
values = []
while not out_queue.empty():
values.append(out_queue.get())
assert len(values) < 5
assert len(values) > 0
Indirect control through signal.SIGINT
This example shows how a naive pipeline (one without explicit control elements) can still be controlled for safe shutdown with a signal handler for keyboard interruption.
def _sample_pipeline(out_queue: Queue):
"""Sample uncontrolled pipeline."""
# Create the pipeline
p = Pipeline()
# Create source element and EOS control
src1 = IterSource(
name="src1",
source_pad_names=["O1"],
iter_map={"src1:src:O1": RandomIntIter(stop=5, seed=42, wait=1)},
)
# Create sink element
snk1 = CollectSink(
name="snk1",
sink_pad_names=["I1"],
collection_factory=lambda x: x,
collects={"snk1:sink:I1": out_queue},
)
# Insert elements into the pipeline
p.insert(src1, snk1)
# Link the elements
p.link(
{
"snk1:sink:I1": "src1:src:O1",
}
)
return p
# Create process and queue
out_queue = Queue()
process, in_queue = run_controlled(
pipeline_factory=_sample_pipeline, join=False, out_queue=out_queue
)
# Wait 1 iteration
time.sleep(1)
# Send an EOS control via signal handler (have to duplicate signal handler
# context manager since we're testing it here)
with SignalHandlerManager(
{controls.signal.SIGINT: SetEOSSignalHandler(in_queue)}
):
signal.raise_signal(controls.signal.SIGINT)
# Join the process
process.join()
# Check that the process is done
assert not process.is_alive()
# Check that the pipeline ran
assert not out_queue.empty()
# Check that the pipeline DID NOT run all the way
values = []
while not out_queue.empty():
values.append(out_queue.get())
assert len(values) < 5
assert len(values) > 0