Skip to content

Draft: add resource source

Chad Hanna requested to merge resource-source into main

This MR introduces a new base source element called TSResourceSource that is derived from TSSource

TSResourceSource helps developers by providing a scaffolding to build more complex source elements. The user provides a list of TSResource derived class instances. These resources must define certain methods namely,

channels(self)
rate(self)
set_data(self, frame)
latest_offset(self)

By doing so, the source element can properly prepare output frames. Below is an example of a Random Noise resource

@dataclass
class RandomNoise(TSResource):
    sample_rate: int = None

    def channels(self):
        return ()

    def rate(self):
        return self.sample_rate

    def latest_offset(self):
        return None  # means that any frame we get from TSSource prepare_frame we can deal with

    def set_data(self, frame):
        for buf in frame:
            buf.set_data(numpy.random.randn(*buf.shape))
        return frame

In reality developers will probably use this to instead define some external resource, e.g, data arriving over a network connection where the data defines the sample rate and channels. A developer could then subclass this to suit their needs.

This MR also adds a version of TSResource designed for resources that will be acquiring data in a separate subprocess called TSSubprocessResource

For this the user needs to define:

sub_process_get_data()
rate()
channels()

sub_process_get_data() must fill an internal multiprocessing queue with Series Buffers. Here is an example

@dataclass
class RandomNoiseRTSubprocess(TSSubprocessResource):
    sample_rate: int = None

    def channels(self):
        return ()

    def rate(self):
        return self.sample_rate

    def sub_process_get_data(self):
        # Simulate a real-time source producing random noise
        self.running.set()
        t0 = int(time.time())
        while self.running.is_set():
            shape = self.channels() + (self.rate(),)
            buf = SeriesBuffer(
                offset=Offset.fromsec(t0), shape=shape, sample_rate=self.rate()
            )
            buf.set_data(numpy.random.randn(*buf.shape))
            t0 += 1
            self.in_queue.put(buf)
            time.sleep(max(0, t0 - time.time()))

This class simulates a real-time source. Below is an example of how you would instantiate a pipeline with these classes

#!/usr/bin/env python3
import time
from dataclasses import dataclass
import numpy
from sgn import Pipeline
from sgnts.base import SeriesBuffer, TSResourceSource, TSSubprocessResource, TSResource
from sgnts.base.offset import Offset
from sgnts.sinks import FakeSeriesSink


@dataclass
class RandomNoise(TSResource):
    sample_rate: int = None

    def channels(self):
        return ()

    def rate(self):
        return self.sample_rate

    def latest_offset(self):
        return None  # means that any frame we get from TSSource prepare_frame we can deal with

    def set_data(self, frame):
        for buf in frame:
            buf.set_data(numpy.random.randn(*buf.shape))
        return frame


@dataclass
class RandomNoiseRTSubprocess(TSSubprocessResource):
    sample_rate: int = None

    def channels(self):
        return ()

    def rate(self):
        return self.sample_rate

    def sub_process_get_data(self):
        # Simulate a real-time source producing random noise
        self.running.set()
        t0 = int(time.time())
        while self.running.is_set():
            shape = self.channels() + (self.rate(),)
            buf = SeriesBuffer(
                offset=Offset.fromsec(t0), shape=shape, sample_rate=self.rate()
            )
            buf.set_data(numpy.random.randn(*buf.shape))
            t0 += 1
            self.in_queue.put(buf)
            time.sleep(max(0, t0 - time.time()))


def main():
    src = TSResourceSource(
        name="random",
        source_resources=[
            RandomNoise(name="random", sample_rate=2048),
            RandomNoiseRTSubprocess(name="rt_random", sample_rate=2048),
        ],
        t0=int(time.time()),
        end=int(time.time()) + 10,
    )

    sink = FakeSeriesSink(name="fakesink", sink_pad_names=["blah", "blah2"])

    # Create the Pipeline
    p = Pipeline()

    # FIXME simplify linking
    p.insert(
        src,
        sink,
        link_map={
            sink.snks["blah"]: src.srcs["random"],
            sink.snks["blah2"]: src.srcs["rt_random"],
        },
    )

    # Run the pipeline
    p.run()


if __name__ == "__main__":
    main()

Which gives:

e1-056827:~ crh184$ SGNLOGLEVEL=DEBUG ./sgnts-source
Executing graph loop 1:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=[])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=None)
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=[])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406065152, offset_end=28376406065152, shape=(0,), sample_rate=2048, duration=0, data=None)
Executing graph loop 2:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.53704617 ... -1.67203457])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=None)
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.53704617 ... -1.67203457])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406065152, offset_end=28376406081536, shape=(2048,), sample_rate=2048, duration=1000000000, data=None)
Executing graph loop 3:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.83562094 ... 0.9555014 ])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.15320658 ...  1.2496674 ])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.83562094 ... 0.9555014 ])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406081536, offset_end=28376406097920, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.15320658 ...  1.2496674 ])
Executing graph loop 4:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 1.3714558  ... -0.80924207])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.42128249 ... -0.59850558])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 1.3714558  ... -0.80924207])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406097920, offset_end=28376406114304, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.42128249 ... -0.59850558])
Executing graph loop 5:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.38280121 ... 0.37295048])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.8018671  ... -0.02763626])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.38280121 ... 0.37295048])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406114304, offset_end=28376406130688, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.8018671  ... -0.02763626])
Executing graph loop 6:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.37315497 ... -2.27350226])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.01592095 ...  0.17205565])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.37315497 ... -2.27350226])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406130688, offset_end=28376406147072, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.01592095 ...  0.17205565])
Executing graph loop 7:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.24512576 ... -0.01477887])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.99255582 ... -0.58535543])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.24512576 ... -0.01477887])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406147072, offset_end=28376406163456, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.99255582 ... -0.58535543])
Executing graph loop 8:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.18185802 ... -0.96350638])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.73989862 ...  1.11464325])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.18185802 ... -0.96350638])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406163456, offset_end=28376406179840, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-1.73989862 ...  1.11464325])
Executing graph loop 9:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.59269431 ... 1.20664535])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.28274651 ... -0.95696311])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.59269431 ... 1.20664535])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406179840, offset_end=28376406196224, shape=(2048,), sample_rate=2048, duration=1000000000, data=[ 0.28274651 ... -0.95696311])
Executing graph loop 10:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.9652405 ... 0.004204 ])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.77363847 ... -0.7684594 ])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.9652405 ... 0.004204 ])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406196224, offset_end=28376406212608, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.77363847 ... -0.7684594 ])
Executing graph loop 11:
	SourcePad(name='random:src:random', _id='51b0445b583741d6962cb793f5d38aeb') : 
	SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.22302944 ... 0.82013531])
	SourcePad(name='random:src:rt_random', _id='e93465f7a25d448096344611c055e55b') : 
	SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.21702349 ...  0.13355077])
	SinkPad(name='fakesink:sink:blah', _id='210d310ebeca4ba1982309ed6c9a8106'):
	SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[0.22302944 ... 0.82013531])
	SinkPad(name='fakesink:sink:blah2', _id='b172bcc4de23498aa1b007b5d4c3c684'):
	SeriesBuffer(offset=28376406212608, offset_end=28376406228992, shape=(2048,), sample_rate=2048, duration=1000000000, data=[-0.21702349 ...  0.13355077])
Edited by Chad Hanna

Merge request reports

Loading