Skip to content

Dynamic App Control

James Kennington requested to merge feature-app-control into main

Motivation

This MR adds dynamic control over pipeline behavior to sgn. Dynamic control is implemented with the following design principles:

  1. Everything is in the graph. No side-effect based message passing, indirect callables, etc. All controls are directly detectable in the graph structure of the Pipeline object.
  2. Backwards Compatible. Controllability is optional and has to be explicitly chosen. All sgn base classes are unchanged.
  3. Extensible App Support. A sufficiently general api is exposed to allow for custom applications behavior regarding controls. Signal handler support is included, but an arbitrary python process is all that is required.

Changes

This MR touches most of sgn, though doesn't change any existing default behavior!

Additions

This MR adds several thematic groups of features:

New controls module for base classes that establish behavior for sending and receiving special frames related to controls:

  • Control: a callable that is applied to an element to manipulate the state of that element
  • ControlSourcePad: A SourcePad subclass, used to distinguish types of pads
  • ControlSinkPad: A SinkPad subclass, used to distinguish types of pads
  • ControllableElement: important mixin class for element classes like CollectSink that allow for explicit inclusion of controllability
  • ControlSourceElement: A SourceElement subclass defining how control sources should behave
  • QueueControlSource: A multiprocessing.Queue based subclass of ControlSourceElement useful out of the box

Helpful interface for running an arbitrary pipeline with controls:

  • apps.run_controlled: helper function for running a Pipeline with controllability
  • SignalHandlerManager: a context manager for setting/unsetting signal handlers (needed for Keyboard Interrupt interception)
  • SetEOSSignalHandler: specific signal handler that sets the force_eos attribute to True on SinkElement

Changes

Several small changes related to test coverage

Examples

Direct control via multiprocessing.Queue

This example shows direct control of a pipeline by submitting a Control object to the input queue of a QueueControlSource element

def _sample_controlled_pipeline(in_queue: Queue, out_queue: Queue):
    """Sample controlled pipeline."""
    # Create the pipeline
    p = Pipeline()

    # Create source element and EOS control
    src1 = IterSource(
        name="src1",
        source_pad_names=["O1"],
        iter_map={"src1:src:O1": RandomIntIter(stop=5, seed=42, wait=1)},
    )
    ctrl = QueueControlSource(name="ctrl-eos", source_pad_names=["O1"], queue=in_queue)

    # Create sink element
    snk1 = CollectSink(
        name="snk1",
        sink_pad_names=["I1"],
        collection_factory=lambda x: x,
        collects={"snk1:sink:I1": out_queue},
    )

    # Insert elements into the pipeline
    p.insert(src1, ctrl, snk1)

    # Link the elements
    p.link({"snk1:sink:I1": "src1:src:O1", "snk1:sink:ctrl": "ctrl-eos:src:O1"})

    # Run the pipeline
    p.run()

# Create process and queue
in_queue = Queue()
out_queue = Queue()
process = multiprocessing.Process(
    target=_sample_controlled_pipeline, args=(in_queue, out_queue)
)

# Check input queue is empty
assert in_queue.empty()

# Start the process
process.start()

# Send an EOS control
in_queue.put(Control(name="EOS", call=controls.set_eos))

# Join the process
process.join()

# Check that the process is done
assert not process.is_alive()

# Check that the pipeline ran
assert not out_queue.empty()

# Check that the pipeline DID NOT run all the way
values = []
while not out_queue.empty():
    values.append(out_queue.get())
assert len(values) < 5
assert len(values) > 0

Indirect control through signal.SIGINT

This example shows how a naive pipeline (one without explicit control elements) can still be controlled for safe shutdown with a signal handler for keyboard interruption.

def _sample_pipeline(out_queue: Queue):
    """Sample uncontrolled pipeline."""
    # Create the pipeline
    p = Pipeline()

    # Create source element and EOS control
    src1 = IterSource(
        name="src1",
        source_pad_names=["O1"],
        iter_map={"src1:src:O1": RandomIntIter(stop=5, seed=42, wait=1)},
    )

    # Create sink element
    snk1 = CollectSink(
        name="snk1",
        sink_pad_names=["I1"],
        collection_factory=lambda x: x,
        collects={"snk1:sink:I1": out_queue},
    )

    # Insert elements into the pipeline
    p.insert(src1, snk1)

    # Link the elements
    p.link(
        {
            "snk1:sink:I1": "src1:src:O1",
        }
    )

    return p

# Create process and queue
out_queue = Queue()
process, in_queue = run_controlled(
    pipeline_factory=_sample_pipeline, join=False, out_queue=out_queue
)

# Wait 1 iteration
time.sleep(1)

# Send an EOS control via signal handler (have to duplicate signal handler
# context manager since we're testing it here)
with SignalHandlerManager(
    {controls.signal.SIGINT: SetEOSSignalHandler(in_queue)}
):
    signal.raise_signal(controls.signal.SIGINT)

# Join the process
process.join()

# Check that the process is done
assert not process.is_alive()

# Check that the pipeline ran
assert not out_queue.empty()

# Check that the pipeline DID NOT run all the way
values = []
while not out_queue.empty():
    values.append(out_queue.get())
assert len(values) < 5
assert len(values) > 0
Edited by James Kennington

Merge request reports

Loading