Skip to content

End-to-end Workflow Examples

James Kennington requested to merge feature-deq-workflow into main

Changes

This MR updates the provided api in sources, sinks, and transforms to be generally useful without need for subclassing.

Adds:

"Null" elements for testing (essentially a refactor of "Fake" elements with more clear docs and names)

  • NullSource: source that always produces empty frames, only used for testing
  • NullSink: sink that does nothing, only used for testing

"Iter" elements for generating and collecting frames of a particular type

  • IterSource: Iterable based source element, one Iterable per source pad
  • CollectSink: Iterable based sink element, one Iterable per sink pad (must support .append syntax)
  • IterFrame: A Frame subclass whose data attribute must be iterable

"Deque" flavors of the above

  • DequeSource: DEQ based source element, one DEQ per source pad
  • DequeSink: DEQ based sink element, one DEQ per sink pad

"Generic N->M Transforms"

  • CallableTransform: transform element based on generic mappings from (inputs, ...) -> callable with control for source pad naming
  • helper function CallableTransform.from_callable for simple case of N==M==1

"Docs and Examples"

  • examples for end-to-end use
  • small example to docs
  • update docs to be in sync with README.md

Removes:

  • FakeSrc element that was only used for testing (not intended for extensibility / user api)
  • FakeSink element (same as above)

Motivation

Primary goals:

  • make base sgn package useful for generic workflows
  • provide iterable based source and sink elements for handling sequences of arbitrary python objects
  • provide transform element that can wrap arbitrary python callables

Obviates the need for older MRs: !26 (closed) !25 (closed)

Examples

Example 0: Trivial usage with Null elements

from sgn import Pipeline, NullSource, NullSink

# Create pipeline in one go
p = Pipeline()
p.insert(NullSource(name='src1',
                    source_pad_names=["H1"]),
         NullSink(name='snk1',
                  sink_pad_names=["H1"]),
         link_map={"snk1:sink:H1": "src1:src:H1"})
p.run()

Example 1: Simple example using a stream of scalars

from collections import deque
from sgn import Pipeline, CollectSink, DequeSource, CallableTransform

# Define a function to use in the pipeline
def add_ten(frame):
    return None if frame.data is None else frame.data + 10

# Create source element
src = DequeSource(
    name="src1",
    source_pad_names=["H1"],
    deqs={"src1:src:H1": deque([1, 2, 3])},
    limits=3
)

# Create a transform element using an arbitrary function
trn1 = CallableTransform.from_callable(
    name="t1",
    sink_pad_names=["H1"],
    callable=add_ten,
    output_name="H1",
)

# Create the sink so we can access the data after running
snk = CollectSink(
    name="snk1",
    sink_pad_names=("H1",),
)

# Create the Pipeline
p = Pipeline()

# Insert elements into pipeline and link them explicitly
p.insert(src, trn1, snk, link_map={
    "t1:sink:H1": "src1:src:H1",
    "snk1:sink:H1": "t1:src:H1",
})

# Run the pipeline
p.run()

# Check the result of the sink queue to see outputs
assert snk.deqs["snk1:sink:H1"] == deque([13, 12, 11])

The above example can be modified to use any data type, including json-friendly nested dictionaries, lists, and strings. The CallableTransform class can be used to create a transform element using any arbitrary function. The DeqSource and DeqSink classes are used to create source and sink elements that use collections.deque to store data.

Example 2: JSON-like iterable payloads

import datetime
import numpy
from sgn import Pipeline, IterSource, CollectSink, CallableTransform, IterFrame

# Define the payloads
payloads = [
    # Payload 1, one trusted one not
    [
        {"time": datetime.datetime.strptime("2021-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"),
         "buffer": numpy.array([1., 2., 3.]),
         "trusted": True},
        {"time": datetime.datetime.strptime("2021-01-01T00:00:01", "%Y-%m-%dT%H:%M:%S"),
         "buffer": numpy.array([1., numpy.nan, 3.]),
         "trusted": False},
    ],
    # Payload 2, both trusted
    [
        {"time": datetime.datetime.strptime("2021-01-01T00:00:02", "%Y-%m-%dT%H:%M:%S"),
         "buffer": numpy.array([4., 5., 6.]),
         "trusted": True},
        {"time": datetime.datetime.strptime("2021-01-01T00:00:03", "%Y-%m-%dT%H:%M:%S"),
         "buffer": numpy.array([7., 8., 9.]),
         "trusted": True},
    ],
]

# Define a function to use in the pipeline
def demean_if_trusted(frame: IterDictFrame):
    if frame.data is None:
        return None

    results = []
    for packet in frame.data:
        new_packet = packet.copy()
        if new_packet["trusted"]:
            new_packet["buffer"] -= numpy.mean(new_packet["buffer"])
        results.append(new_packet)
    return results

# Create source element
src = IterSource(
    name="src1",
    source_pad_names=["H1"],
    iters={"src1:src:H1": payloads},
    frame_factory=IterFrame,
)

# Create a transform element using an arbitrary function
trn1 = CallableTransform.from_callable(
    name="t1",
    sink_pad_names=["H1"],
    callable=demean_if_trusted,
    output_name="H1",
)

# Create the sink so we can access the data after running
snk = CollectSink(
    name="snk1",
    sink_pad_names=("H1",),
)

# Create the Pipeline
p = Pipeline()

# Insert elements into pipeline and link them explicitly
p.insert(src, trn1, snk, link_map={
    "t1:sink:H1": "src1:src:H1",
    "snk1:sink:H1": "t1:src:H1",
})

# Run the pipeline
p.run()

# Check the result of the sink queue to see outputs
assert list(snk.iters["snk1:sink:H1"]) == [
    [
        {"time": datetime.datetime(2021, 1, 1, 0, 0, 0),
         "buffer": numpy.array([-1., 0., 1.]),
         "trusted": True},
        {"time": datetime.datetime(2021, 1, 1, 0, 0, 1),
         "buffer": numpy.array([1., numpy.nan, 3.]),
         "trusted": False},
    ],
    [
        {"time": datetime.datetime(2021, 1, 1, 0, 0, 2),
         "buffer": numpy.array([-1., 0., 1.]),
         "trusted": True},
        {"time": datetime.datetime(2021, 1, 1, 0, 0, 3),
         "buffer": numpy.array([-1., 0., 1.]),
         "trusted": True},
    ]
]
Edited by James Kennington

Merge request reports

Loading