Skip to content

Add InternalPad to all Elements

James Kennington requested to merge feature-internal-pad into main

Motivation

An InternalPad allows each element to optionally perform an action between its sink pads (inputs) and source pads (outputs).

Original motivating use case was for SinkElements, which had to perform their IO or other "sink" behavior in the pull method, which isn't the appropriate behavior. The pull method is simply for pulling a given Frame onto a SinkPad, not for performing any intensive operations, analogous to an import statement (we don't want to do anything heavy on import!). With the distinction of an InternalPad, the SinkElement is able to take its "sink" action explicitly after all SinkPads have pulled data.

Changes

Additions:

Additions related to InternalPad concept

  • _InternalPadLike class
  • InternalPad class
  • internal_pad attribute on ElementLike class
  • internal method to ElementLike class, that does nothing by default

Additions related to graph outputs and testing / validating graph construction

  • to_graph method on Pipeline class
  • to_dot method on Pipeline class

Updates:

  • Inheritance MRO for element base classes, to allow ElementLike to have a name attribute. Specifically:
    • Each *Element class now only inherits from ElementLike
    • ElementLike now inherits from UniqueID instead of _PostInitBase

Removals:

This MR removes no key features / classes / attributes.

Examples

Motivating Example: Sinks

Using the internal sink pattern, sinks can now separate the pull and internal steps:

class CollectSink(SinkElement):
    ...

    def __post_init__(self):
        ...
        # Create attr for storing most recent inputs per pad
        self.inputs = {}

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        if frame.EOS:
            self.mark_eos(pad)

        self.inputs[pad.name] = frame


    def internal(self, pad: InternalPad) -> None:
        for pad, frame in self.inputs.items():
            if frame.data is not None:
                self.collects[pad].append(frame.data if self.extract_data else frame)

        self.inputs = {}

Merge request reports

Loading