Skip to content

check-for-complete-linking

Ryan Magee requested to merge check-for-linking into main

This just re-implements @joshua.gonsalves !15 (closed) linking check in the current codebase. A MWE is below:

from sgn import Pipeline, NullSource, NullSink
from sgn.base import Frame, TransformElement

class Transform(TransformElement):
    def __post_init__(self):
        super().__post_init__()
    def pull(self, pad, frame):
        return Frame()
    def transform(self, pad):
        return Frame()

# Create pipeline in one go
p = Pipeline()
p.insert(NullSource(name='src1',
                    source_pad_names=["H1"]),
         Transform(name='t1',
                    sink_pad_names=["H1"],
                    source_pad_names=["H1"]),
         NullSink(name='snk1',
                  sink_pad_names=["H1"]),
         link_map={"t1:sink:H1": "src1:src:H1","snk1:sink:H1": "t1:src:H1"})
p.run()

Adding "L1" as a source/sink pad within the source/transform/sink results in an error, which is desired:

Traceback (most recent call last):
  File "/Users/ryan/Code/greg/sgn/mwe.py", line 22, in <module>
    p.run()
  File "/Users/ryan/Code/greg/sgn/src/sgn/apps.py", line 272, in run
    for pad in element.source_pads: assert pad.is_linked, "All pads not linked"
AssertionError: All pads not linked

Merge request reports

Loading