Add Documentation and Unit Tests
Changes
This MR adds several things:
- ~100% test coverage
- documentation and CI docs build
- simple use-case example
-
MemorySink
sink class -
OperatorTransform
transform class for generic callable -
transform
decorator factory for syntactic sugar in simplified cases (chaining single output operations)
Testing
This MR has been tested using a new pytest test suite with comprehensive coverage
Examples
Operator Transforms
The operator transform takes:
-
func
(required): callable to be executed on the all frame inputs given as args -
input_processor
(optional): a preprocessor that can be applied to each input frame beforefunc
is called -
output_processor
(optional): a post-processor that can be used to modify the resulting frame before it is assigned to the output pad
def op_add(*args):
return sum(args)
elem = OperatorTransform(
name="add",
sink_pad_names=["Inp1", "Inp2"],
source_pad_names=["Out1"],
func=op_add
)
The above will essentially call Out1 = op_add(Inp1, Inp2)
when executed in a pipeline.
Using a Memory Sink
Memory sink is a simple solution for keeping a result of a pipeline accessible without serialization. The default memory is a dict, though anything satisfying the dict-interface can be used (e.g. HDFStore objects, etc).
# ... build pipeline
memory = {}
MemorySink(
name="snk1",
sink_pad_names=("Out1",),
memory=memory,
)
# ... run pipeline
print(memory['result'])
Full example of small calculation with scalars
This example uses the transform
decorator to provide a simpler UI for pipeline building.
from sgn import apps, sources, sinks
# Define some operators
@apps.transform()
def add_5(x):
return x + 5
@apps.transform()
def mult_2(x):
return x * 2
# Create a pipeline
p = apps.Pipeline()
# Create source element
src = sources.ConstantSource(value=10)
# Implicitly construct pipeline by calling the operators
v = add_5(p, src)
v = mult_2(p, v)
v = add_5(p, v)
# Create sink element and add to pipeline
memory = {}
snk = sinks.MemorySink(memory=memory, sink_pad_names=["Out1"], extractor=lambda frame: frame.value)
p.insert(snk, link_map={snk.sink_pads[0].name: v.source_pads[0].name})
# Run the pipeline
p.run()
# Check the result
assert memory["result"] == 35
Edited by James Kennington