Draft: add resource source
This MR introduces a new base source element called TSResourceSource
that is derived from TSSource
TSResourceSource
helps developers by providing a scaffolding to build more complex source elements. The user provides a list of TSResource derived class instances. These resources must define certain methods namely,
channels(self)
rate(self)
set_data(self, frame)
latest_offset(self)
By doing so, the source element can properly prepare output frames. Below is an example of a Random Noise resource
@dataclass
class RandomNoise(TSResource):
sample_rate: int = None
def channels(self):
return ()
def rate(self):
return self.sample_rate
def latest_offset(self):
return None # means that any frame we get from TSSource prepare_frame we can deal with
def set_data(self, frame):
for buf in frame:
buf.set_data(numpy.random.randn(*buf.shape))
return frame
In reality developers will probably use this to instead define some external resource, e.g, data arriving over a network connection where the data defines the sample rate and channels. A developer could then subclass this to suit their needs.
This MR also adds a version of TSResource designed for resources that will be acquiring data in a separate subprocess called TSSubprocessResource
For this the user needs to define:
sub_process_get_data()
rate()
channels()
sub_process_get_data()
must fill an internal multiprocessing queue with Series Buffers. Here is an example
@dataclass
class RandomNoiseRTSubprocess(TSSubprocessResource):
sample_rate: int = None
def channels(self):
return ()
def rate(self):
return self.sample_rate
def sub_process_get_data(self):
# Simulate a real-time source producing random noise
self.running.set()
t0 = int(time.time())
while self.running.is_set():
shape = self.channels() + (self.rate(),)
buf = SeriesBuffer(
offset=Offset.fromsec(t0), shape=shape, sample_rate=self.rate()
)
buf.set_data(numpy.random.randn(*buf.shape))
t0 += 1
self.in_queue.put(buf)
time.sleep(max(0, t0 - time.time()))
This class simulates a real-time source. Below is an example of how you would instantiate a pipeline with these classes
#!/usr/bin/env python3
import time
from dataclasses import dataclass
import numpy
from sgn import Pipeline
from sgnts.base import SeriesBuffer, TSResourceSource, TSSubprocessResource, TSResource
from sgnts.base.offset import Offset
from sgnts.sinks import FakeSeriesSink
@dataclass
class RandomNoise(TSResource):
sample_rate: int = None
def channels(self):
return ()
def rate(self):
return self.sample_rate
def latest_offset(self):
return None # means that any frame we get from TSSource prepare_frame we can deal with
def set_data(self, frame):
for buf in frame:
buf.set_data(numpy.random.randn(*buf.shape))
return frame
@dataclass
class RandomNoiseRTSubprocess(TSSubprocessResource):
sample_rate: int = None
def channels(self):
return ()
def rate(self):
return self.sample_rate
def sub_process_get_data(self):
# Simulate a real-time source producing random noise
self.running.set()
t0 = int(time.time())
while self.running.is_set():
shape = self.channels() + (self.rate(),)
buf = SeriesBuffer(
offset=Offset.fromsec(t0), shape=shape, sample_rate=self.rate()
)
buf.set_data(numpy.random.randn(*buf.shape))
t0 += 1
self.in_queue.put(buf)
time.sleep(max(0, t0 - time.time()))
def main():
src = TSResourceSource(
name="random",
source_resources=[
RandomNoise(name="random", sample_rate=2048),
RandomNoiseRTSubprocess(name="rt_random", sample_rate=2048),
],
t0=int(time.time()),
end=int(time.time()) + 10,
)
sink = FakeSeriesSink(name="fakesink", sink_pad_names=["blah", "blah2"])
# Create the Pipeline
p = Pipeline()
# FIXME simplify linking
p.insert(
src,
sink,
link_map={
sink.snks["blah"]: src.srcs["random"],
sink.snks["blah2"]: src.srcs["rt_random"],
},
)
# Run the pipeline
p.run()
if __name__ == "__main__":
main()
Which gives:
e1-056827:~ crh184$ SGNLOGLEVEL=DEBUG ./sgnts-source
Executing graph loop 1:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=[])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=None)
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=[])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=None)
Executing graph loop 2:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.53704617 ... -1.67203457])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=None)
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.53704617 ... -1.67203457])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=None)
Executing graph loop 3:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.83562094 ... 0.9555014 ])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.15320658 ... 1.2496674 ])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.83562094 ... 0.9555014 ])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.15320658 ... 1.2496674 ])
Executing graph loop 4:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 1.3714558 ... -0.80924207])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.42128249 ... -0.59850558])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 1.3714558 ... -0.80924207])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.42128249 ... -0.59850558])
Executing graph loop 5:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.38280121 ... 0.37295048])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.8018671 ... -0.02763626])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.38280121 ... 0.37295048])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.8018671 ... -0.02763626])
Executing graph loop 6:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.37315497 ... -2.27350226])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.01592095 ... 0.17205565])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.37315497 ... -2.27350226])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.01592095 ... 0.17205565])
Executing graph loop 7:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.24512576 ... -0.01477887])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.99255582 ... -0.58535543])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.24512576 ... -0.01477887])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.99255582 ... -0.58535543])
Executing graph loop 8:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.18185802 ... -0.96350638])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.73989862 ... 1.11464325])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.18185802 ... -0.96350638])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.73989862 ... 1.11464325])
Executing graph loop 9:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.59269431 ... 1.20664535])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.28274651 ... -0.95696311])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.59269431 ... 1.20664535])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.28274651 ... -0.95696311])
Executing graph loop 10:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.9652405 ... 0.004204 ])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.77363847 ... -0.7684594 ])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.9652405 ... 0.004204 ])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.77363847 ... -0.7684594 ])
Executing graph loop 11:
SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') :
SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.22302944 ... 0.82013531])
SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') :
SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.21702349 ... 0.13355077])
SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.22302944 ... 0.82013531])
SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.21702349 ... 0.13355077])