End-to-end Workflow Examples
Changes
This MR updates the provided api in sources
, sinks
, and transforms
to be generally useful without need for subclassing.
Adds:
"Null" elements for testing (essentially a refactor of "Fake" elements with more clear docs and names)
-
NullSource
: source that always produces empty frames, only used for testing -
NullSink
: sink that does nothing, only used for testing
"Iter" elements for generating and collecting frames of a particular type
-
IterSource
: Iterable based source element, one Iterable per source pad -
CollectSink
: Iterable based sink element, one Iterable per sink pad (must support.append
syntax) -
IterFrame
: A Frame subclass whosedata
attribute must be iterable
"Deque" flavors of the above
-
DequeSource
: DEQ based source element, one DEQ per source pad -
DequeSink
: DEQ based sink element, one DEQ per sink pad
"Generic N->M Transforms"
-
CallableTransform
: transform element based on generic mappings from (inputs, ...) -> callable with control for source pad naming - helper function
CallableTransform.from_callable
for simple case of N==M==1
"Docs and Examples"
- examples for end-to-end use
- small example to docs
- update docs to be in sync with README.md
Removes:
-
FakeSrc
element that was only used for testing (not intended for extensibility / user api) -
FakeSink
element (same as above)
Motivation
Primary goals:
- make base
sgn
package useful for generic workflows - provide iterable based source and sink elements for handling sequences of arbitrary python objects
- provide transform element that can wrap arbitrary python callables
Obviates the need for older MRs: !26 (closed) !25 (closed)
Examples
Null
elements
Example 0: Trivial usage with from sgn import Pipeline, NullSource, NullSink
# Create pipeline in one go
p = Pipeline()
p.insert(NullSource(name='src1',
source_pad_names=["H1"]),
NullSink(name='snk1',
sink_pad_names=["H1"]),
link_map={"snk1:sink:H1": "src1:src:H1"})
p.run()
Example 1: Simple example using a stream of scalars
from collections import deque
from sgn import Pipeline, CollectSink, DequeSource, CallableTransform
# Define a function to use in the pipeline
def add_ten(frame):
return None if frame.data is None else frame.data + 10
# Create source element
src = DequeSource(
name="src1",
source_pad_names=["H1"],
deqs={"src1:src:H1": deque([1, 2, 3])},
limits=3
)
# Create a transform element using an arbitrary function
trn1 = CallableTransform.from_callable(
name="t1",
sink_pad_names=["H1"],
callable=add_ten,
output_name="H1",
)
# Create the sink so we can access the data after running
snk = CollectSink(
name="snk1",
sink_pad_names=("H1",),
)
# Create the Pipeline
p = Pipeline()
# Insert elements into pipeline and link them explicitly
p.insert(src, trn1, snk, link_map={
"t1:sink:H1": "src1:src:H1",
"snk1:sink:H1": "t1:src:H1",
})
# Run the pipeline
p.run()
# Check the result of the sink queue to see outputs
assert snk.deqs["snk1:sink:H1"] == deque([13, 12, 11])
The above example can be modified to use any data type, including json-friendly
nested dictionaries, lists, and strings. The CallableTransform
class can be used to
create a transform element using any arbitrary function. The DeqSource
and DeqSink
classes
are used to create source and sink elements that use collections.deque
to store data.
Example 2: JSON-like iterable payloads
import datetime
import numpy
from sgn import Pipeline, IterSource, CollectSink, CallableTransform, IterFrame
# Define the payloads
payloads = [
# Payload 1, one trusted one not
[
{"time": datetime.datetime.strptime("2021-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"),
"buffer": numpy.array([1., 2., 3.]),
"trusted": True},
{"time": datetime.datetime.strptime("2021-01-01T00:00:01", "%Y-%m-%dT%H:%M:%S"),
"buffer": numpy.array([1., numpy.nan, 3.]),
"trusted": False},
],
# Payload 2, both trusted
[
{"time": datetime.datetime.strptime("2021-01-01T00:00:02", "%Y-%m-%dT%H:%M:%S"),
"buffer": numpy.array([4., 5., 6.]),
"trusted": True},
{"time": datetime.datetime.strptime("2021-01-01T00:00:03", "%Y-%m-%dT%H:%M:%S"),
"buffer": numpy.array([7., 8., 9.]),
"trusted": True},
],
]
# Define a function to use in the pipeline
def demean_if_trusted(frame: IterDictFrame):
if frame.data is None:
return None
results = []
for packet in frame.data:
new_packet = packet.copy()
if new_packet["trusted"]:
new_packet["buffer"] -= numpy.mean(new_packet["buffer"])
results.append(new_packet)
return results
# Create source element
src = IterSource(
name="src1",
source_pad_names=["H1"],
iters={"src1:src:H1": payloads},
frame_factory=IterFrame,
)
# Create a transform element using an arbitrary function
trn1 = CallableTransform.from_callable(
name="t1",
sink_pad_names=["H1"],
callable=demean_if_trusted,
output_name="H1",
)
# Create the sink so we can access the data after running
snk = CollectSink(
name="snk1",
sink_pad_names=("H1",),
)
# Create the Pipeline
p = Pipeline()
# Insert elements into pipeline and link them explicitly
p.insert(src, trn1, snk, link_map={
"t1:sink:H1": "src1:src:H1",
"snk1:sink:H1": "t1:src:H1",
})
# Run the pipeline
p.run()
# Check the result of the sink queue to see outputs
assert list(snk.iters["snk1:sink:H1"]) == [
[
{"time": datetime.datetime(2021, 1, 1, 0, 0, 0),
"buffer": numpy.array([-1., 0., 1.]),
"trusted": True},
{"time": datetime.datetime(2021, 1, 1, 0, 0, 1),
"buffer": numpy.array([1., numpy.nan, 3.]),
"trusted": False},
],
[
{"time": datetime.datetime(2021, 1, 1, 0, 0, 2),
"buffer": numpy.array([-1., 0., 1.]),
"trusted": True},
{"time": datetime.datetime(2021, 1, 1, 0, 0, 3),
"buffer": numpy.array([-1., 0., 1.]),
"trusted": True},
]
]
Edited by James Kennington