Skip to content

Add Documentation and Unit Tests

James Kennington requested to merge feature-docs into main

Changes

This MR adds several things:

  • ~100% test coverage
  • documentation and CI docs build
  • simple use-case example
  • MemorySink sink class
  • OperatorTransform transform class for generic callable
  • transform decorator factory for syntactic sugar in simplified cases (chaining single output operations)

Testing

This MR has been tested using a new pytest test suite with comprehensive coverage

Examples

Operator Transforms

The operator transform takes:

  • func (required): callable to be executed on the all frame inputs given as args
  • input_processor (optional): a preprocessor that can be applied to each input frame before func is called
  • output_processor (optional): a post-processor that can be used to modify the resulting frame before it is assigned to the output pad
def op_add(*args):
    return sum(args)

elem = OperatorTransform(
    name="add",
    sink_pad_names=["Inp1", "Inp2"],
    source_pad_names=["Out1"],
    func=op_add
)

The above will essentially call Out1 = op_add(Inp1, Inp2) when executed in a pipeline.

Using a Memory Sink

Memory sink is a simple solution for keeping a result of a pipeline accessible without serialization. The default memory is a dict, though anything satisfying the dict-interface can be used (e.g. HDFStore objects, etc).

# ... build pipeline

memory = {}
MemorySink(
    name="snk1",
    sink_pad_names=("Out1",),
    memory=memory,
)

# ... run pipeline

print(memory['result'])

Full example of small calculation with scalars

This example uses the transform decorator to provide a simpler UI for pipeline building.

from sgn import apps, sources, sinks

# Define some operators
@apps.transform()
def add_5(x):
    return x + 5

@apps.transform()
def mult_2(x):
    return x * 2

# Create a pipeline
p = apps.Pipeline()

# Create source element
src = sources.ConstantSource(value=10)

# Implicitly construct pipeline by calling the operators
v = add_5(p, src)
v = mult_2(p, v)
v = add_5(p, v)

# Create sink element and add to pipeline
memory = {}
snk = sinks.MemorySink(memory=memory, sink_pad_names=["Out1"], extractor=lambda frame: frame.value)
p.insert(snk, link_map={snk.sink_pads[0].name: v.source_pads[0].name})

# Run the pipeline
p.run()

# Check the result
assert memory["result"] == 35
Edited by James Kennington

Merge request reports

Loading