Add high-level Stream API to build GStreamer pipelines
This MR adds a high level Stream API to simplify the creation of building GStreamer pipelines within GstLAL. The two main drivers are to:
- Avoid excessive boilerplate
- Avoid exposing technical details of GStreamer's machinery to create pipelines
For this, a new module gstlal.stream
has been created with a Stream
class which:
- Manages the main event loop, pipeline and handlers in one central location
- Allows
pipeparts
-style elements to be registered as Stream methods, allowing chaining - Folds in common boilerplate typically used when creating a pipeline:
- Gst imports and init
- Start mainloop, pipeline instances
- Handles state management, seeking
- Adds a wrapper around appsink, AppSync allowing registering callbacks when new buffers arrive via
stream.bufsink
This also converts all existing pipelines in gstlal
, gstlal-ugly
and gstlal-inspiral
.
Example (gstlal_ll_dq
):
tracker = NoiseTracker(options.out_path, instrument, agg_sink)
stream = Stream.from_datasource(gw_data_source_info, instrument, verbose=options.verbose)
stream.add_callback(MessageType.ELEMENT, "spectrum", tracker.on_spectrum_message)
stream.resample(quality=9) \
.capsfilter("audio/x-raw, rate=%d" % options.sample_rate) \
.queue(max_size_buffers=8) \
.whiten(fft_length=options.psd_fft_length, expand_gaps=True) \
.queue() \
.reblock() \
.bufsink(tracker.on_buffer)
stream.start()
Other changes as part of this:
-
multichannel_datasource.py
had its main function,mkwhitened_multirate_src
, split off into two parts,mkcondition
andmkmultiband
since multiple locations only used the previous function with a single rate defeating the purpose of the multibanding feature (e.g.gstlal_fake_frames
). The module was moved topipeparts/condition.py
but that was a personal choice I am happy to change if needed. -
mkhtgate
indatasource.py
was also moved topipeparts.condition
as well. -
itacac
element was also added inpipeparts
as part of this as this didn't exist previously and was needed to allow creation of this element as part of the stream API. -
-
gstlal_fake_frames
: If the output directory does not exist, create it instead of silently failing.
-
Merge request reports
Activity
requested review from @james.kennington
added 41 commits
-
cf6e4bbb...4972a321 - 14 commits from branch
master
- 1c5d330e - development of new stream API for building Gstreamer pipelines
- 68a58cee - modify gstlal_ll_dq to use stream API
- 8df29345 - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- 9f53cd90 - gstlal_play: convert to stream API
- c48064df - stream.Stream: allow pipeline naming, head.connect(), debugging output
- b7fb9806 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- 0c0a7190 - gstlal_stage_frames: convert to stream API
- abf11987 - migrate mkhtgate from datasource to pipeparts.condition
- 72381b16 - gstlal_fake_frames: convert to stream API
- e79f7420 - stream.StreamHandler: handle edge case when message structure is undefined
- ea00fc7a - reference_psd.measure_psd(): convert to stream API
- b6dda0ff - stream.py: register elements within Stream class internally rather than on module load
- 1f9b93b4 - reference_psd.py: tidy up stream in measure_psd()
- 992f4a5b - pipeparts/condition.py: add missing args to func signatures
- e133af7d - add itacac elem to pipeparts
- 1a9a6a68 - stream.py: add functionality to support gstlal_inspiral
- ee54e18e - gstlal_ll_dq: remove nxydump layer prior to appsink
- bd37fa33 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- 379c1ee7 - simplification/additions to stream API
- fe849355 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- 5b949042 - Stream: add functionality to get elem by name, post bus messages. update...
- 64c08644 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- 9333a61a - stream.py: add type annotations, minor cleanup
- 270e147a - add basic stream API tests
- 94c2b098 - stream.py: add docstrings
- a8bdeb2f - reference_psd.py: remove unused PSDHandler
- 762d8688 - svd_bank_snr.py: minor cleanup
Toggle commit list-
cf6e4bbb...4972a321 - 14 commits from branch
added 47 commits
-
762d8688...d3d546c8 - 20 commits from branch
master
- 2cd102ea - development of new stream API for building Gstreamer pipelines
- b0bcc332 - modify gstlal_ll_dq to use stream API
- 210fd1fa - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- eda0e19c - gstlal_play: convert to stream API
- c1f8fc64 - stream.Stream: allow pipeline naming, head.connect(), debugging output
- 90bfbc43 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- 685af7ea - gstlal_stage_frames: convert to stream API
- 0102ac6a - migrate mkhtgate from datasource to pipeparts.condition
- 20335bdc - gstlal_fake_frames: convert to stream API
- 3d0440de - stream.StreamHandler: handle edge case when message structure is undefined
- 12913872 - reference_psd.measure_psd(): convert to stream API
- 72face26 - stream.py: register elements within Stream class internally rather than on module load
- 6ac5bc64 - reference_psd.py: tidy up stream in measure_psd()
- 98f7911d - pipeparts/condition.py: add missing args to func signatures
- 70b197d9 - add itacac elem to pipeparts
- fbf202f9 - stream.py: add functionality to support gstlal_inspiral
- b0d7c7dc - gstlal_ll_dq: remove nxydump layer prior to appsink
- 3d6a42ec - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- a73900a9 - simplification/additions to stream API
- 9ca7793b - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- 32670f5f - Stream: add functionality to get elem by name, post bus messages. update...
- ef77a881 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- ec8d5e33 - stream.py: add type annotations, minor cleanup
- fc9c49f8 - add basic stream API tests
- 656d6073 - stream.py: add docstrings
- 5c22f384 - reference_psd.py: remove unused PSDHandler
- df83cd7d - svd_bank_snr.py: minor cleanup
Toggle commit list-
762d8688...d3d546c8 - 20 commits from branch
added 59 commits
-
df83cd7d...2c9ff3c2 - 32 commits from branch
master
- 5cb49327 - development of new stream API for building Gstreamer pipelines
- 2daf17e3 - modify gstlal_ll_dq to use stream API
- c3374c0f - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- 38f5adef - gstlal_play: convert to stream API
- 89e08075 - stream.Stream: allow pipeline naming, head.connect(), debugging output
- e5e5c60f - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- 38d66f95 - gstlal_stage_frames: convert to stream API
- 57f7c2f2 - migrate mkhtgate from datasource to pipeparts.condition
- 5840c8bc - gstlal_fake_frames: convert to stream API
- a422aaa3 - stream.StreamHandler: handle edge case when message structure is undefined
- 20c36b2c - reference_psd.measure_psd(): convert to stream API
- e6f4a3a1 - stream.py: register elements within Stream class internally rather than on module load
- a09591d3 - reference_psd.py: tidy up stream in measure_psd()
- bcd4f8b8 - pipeparts/condition.py: add missing args to func signatures
- 08a9423c - add itacac elem to pipeparts
- 00879d28 - stream.py: add functionality to support gstlal_inspiral
- 4c2e8560 - gstlal_ll_dq: remove nxydump layer prior to appsink
- 44a11604 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- 97990687 - simplification/additions to stream API
- af651170 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- 3b15cc6d - Stream: add functionality to get elem by name, post bus messages. update...
- 95cd9a12 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- 8cefafbb - stream.py: add type annotations, minor cleanup
- 3eaa4f95 - add basic stream API tests
- bd6a7318 - stream.py: add docstrings
- 5542aef6 - reference_psd.py: remove unused PSDHandler
- 68468147 - svd_bank_snr.py: minor cleanup
Toggle commit list-
df83cd7d...2c9ff3c2 - 32 commits from branch
added 35 commits
-
68468147...64852648 - 8 commits from branch
master
- 9334d448 - development of new stream API for building Gstreamer pipelines
- ad446906 - modify gstlal_ll_dq to use stream API
- d4bda00e - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- 1214ca6d - gstlal_play: convert to stream API
- 972c99a0 - stream.Stream: allow pipeline naming, head.connect(), debugging output
- 987b1f75 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- fd7ed396 - gstlal_stage_frames: convert to stream API
- 01f1c2a7 - migrate mkhtgate from datasource to pipeparts.condition
- dc61ca15 - gstlal_fake_frames: convert to stream API
- 81a64604 - stream.StreamHandler: handle edge case when message structure is undefined
- 4e432a6c - reference_psd.measure_psd(): convert to stream API
- 5fd57e43 - stream.py: register elements within Stream class internally rather than on module load
- f1d4d0d8 - reference_psd.py: tidy up stream in measure_psd()
- e2b4ac35 - pipeparts/condition.py: add missing args to func signatures
- 64a94841 - add itacac elem to pipeparts
- 30812ef1 - stream.py: add functionality to support gstlal_inspiral
- c05ee00c - gstlal_ll_dq: remove nxydump layer prior to appsink
- 9e245176 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- 78f64cf1 - simplification/additions to stream API
- 6935f93e - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- 816d552d - Stream: add functionality to get elem by name, post bus messages. update...
- c55afee6 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- 80aba599 - stream.py: add type annotations, minor cleanup
- 78b4ef5f - add basic stream API tests
- 389d7b2f - stream.py: add docstrings
- deafc0d7 - svd_bank_snr.py: minor cleanup
- 6ed52ca4 - stream.py: allow name kwarg in Stream.from_datasource()
Toggle commit list-
68468147...64852648 - 8 commits from branch
added 34 commits
-
6ed52ca4...d90a5aa3 - 5 commits from branch
master
- 44fae91d - development of new stream API for building Gstreamer pipelines
- 870b60bb - modify gstlal_ll_dq to use stream API
- 45286676 - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- 98d149b3 - gstlal_play: convert to stream API
- ff7f2e2f - stream.Stream: allow pipeline naming, head.connect(), debugging output
- ed6d4e9e - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- 6817328d - gstlal_stage_frames: convert to stream API
- c818a95d - migrate mkhtgate from datasource to pipeparts.condition
- 07681407 - gstlal_fake_frames: convert to stream API
- 288e2142 - stream.StreamHandler: handle edge case when message structure is undefined
- 107dab4c - reference_psd.measure_psd(): convert to stream API
- 66c55b89 - stream.py: register elements within Stream class internally rather than on module load
- c99d82c2 - reference_psd.py: tidy up stream in measure_psd()
- 9c1f5de4 - pipeparts/condition.py: add missing args to func signatures
- 36b22f14 - add itacac elem to pipeparts
- 836174f3 - stream.py: add functionality to support gstlal_inspiral
- d2e762de - gstlal_ll_dq: remove nxydump layer prior to appsink
- 002a520e - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- f8b64c61 - simplification/additions to stream API
- bedeeee8 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- 2c9c56c9 - Stream: add functionality to get elem by name, post bus messages. update...
- 850ba8b4 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- d960fd06 - stream.py: add type annotations, minor cleanup
- 92835950 - add basic stream API tests
- fa1cb10e - stream.py: add docstrings
- 61ede589 - svd_bank_snr.py: minor cleanup
- 62bb759c - stream.py: allow name kwarg in Stream.from_datasource()
- 1afeb05d - lloidparts.py: preserve previous names for registered elements
- d229b229 - stream.py: rename Stream.remap() -> Stream.clear() for clarity
Toggle commit list-
6ed52ca4...d90a5aa3 - 5 commits from branch
added 43 commits
-
30d600ba...b6020f6c - 13 commits from branch
master
- 8a32ee1e - development of new stream API for building Gstreamer pipelines
- c69c03f6 - modify gstlal_ll_dq to use stream API
- 7e6ac352 - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- 5f8ce159 - gstlal_play: convert to stream API
- ede7cfed - stream.Stream: allow pipeline naming, head.connect(), debugging output
- 46407847 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- 069c92c0 - gstlal_stage_frames: convert to stream API
- b8a46d12 - migrate mkhtgate from datasource to pipeparts.condition
- a0a6a48c - gstlal_fake_frames: convert to stream API
- 19b31525 - stream.StreamHandler: handle edge case when message structure is undefined
- 761cdd0a - reference_psd.measure_psd(): convert to stream API
- 17932f4e - stream.py: register elements within Stream class internally rather than on module load
- a8de5ce4 - reference_psd.py: tidy up stream in measure_psd()
- 1acd71fe - pipeparts/condition.py: add missing args to func signatures
- 73bfbe27 - add itacac elem to pipeparts
- 5ef425c0 - stream.py: add functionality to support gstlal_inspiral
- 2fdea40d - gstlal_ll_dq: remove nxydump layer prior to appsink
- bcd19931 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- 688534fb - simplification/additions to stream API
- 202f0211 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- f997ee2a - Stream: add functionality to get elem by name, post bus messages. update...
- 51c48a32 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- 244d621f - stream.py: add type annotations, minor cleanup
- 43506e3a - add basic stream API tests
- 9365427e - stream.py: add docstrings
- d26493b1 - svd_bank_snr.py: minor cleanup
- 4cdb3acc - stream.py: allow name kwarg in Stream.from_datasource()
- 7deb1744 - lloidparts.py: preserve previous names for registered elements
- fdf0b562 - stream.py: rename Stream.remap() -> Stream.clear() for clarity
- df57c045 - Make stream api inspectable
Toggle commit list-
30d600ba...b6020f6c - 13 commits from branch
added 1 commit
- dbc43f7e - lloidhandler.py: fix indentation issue from rebase
added 56 commits
-
dbc43f7e...c0ece453 - 23 commits from branch
master
- 13157ef1 - development of new stream API for building Gstreamer pipelines
- 16bf56b8 - modify gstlal_ll_dq to use stream API
- c06e670c - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- af4eab3a - gstlal_play: convert to stream API
- 0fe46ed5 - stream.Stream: allow pipeline naming, head.connect(), debugging output
- 35d6a01a - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- 195fa352 - gstlal_stage_frames: convert to stream API
- 2591adde - migrate mkhtgate from datasource to pipeparts.condition
- e92341bf - gstlal_fake_frames: convert to stream API
- c8eea019 - stream.StreamHandler: handle edge case when message structure is undefined
- 898d412d - reference_psd.measure_psd(): convert to stream API
- e970f17a - stream.py: register elements within Stream class internally rather than on module load
- 16dc5cad - reference_psd.py: tidy up stream in measure_psd()
- a25bd513 - pipeparts/condition.py: add missing args to func signatures
- 7beb6f72 - add itacac elem to pipeparts
- 653546e9 - stream.py: add functionality to support gstlal_inspiral
- e2128d22 - gstlal_ll_dq: remove nxydump layer prior to appsink
- 50297982 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- 8ea949f2 - simplification/additions to stream API
- fae2d174 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- 5fb9ae3c - Stream: add functionality to get elem by name, post bus messages. update...
- d2752752 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- d1ab865b - stream.py: add type annotations, minor cleanup
- 619f5bc8 - add basic stream API tests
- c2e109e3 - stream.py: add docstrings
- 33e6d70c - svd_bank_snr.py: minor cleanup
- c39cec4c - stream.py: allow name kwarg in Stream.from_datasource()
- a0fd8ea5 - lloidparts.py: preserve previous names for registered elements
- ced9f1a8 - stream.py: rename Stream.remap() -> Stream.clear() for clarity
- 1f616c4a - Make stream api inspectable
- bf0ee9d7 - lloidhandler.py: fix indentation issue from rebase
- 66971716 - stream.py: remove unused import
- e3011955 - stream.py: cast memoryview to bytes as from_buffer() methods do not handle this case
Toggle commit list-
dbc43f7e...c0ece453 - 23 commits from branch
A couple of unrelated additions to this merge request involve:
-
gstlal_fake_frames
: If the output directory does not exist, create it instead of silently failing. -
multichannel_datasource.py
had its main function,mkwhitened_multirate_src
, split off into two parts,mkcondition
andmkmultiband
since multiple locations only used the previous function with a single rate defeating the purpose of the multibanding feature (e.g.gstlal_fake_frames
). The module was moved topipeparts/condition.py
but that was a personal choice I am happy to change if needed.
-
added 53 commits
-
e3011955...cd067f5e - 20 commits from branch
master
- 78ce1775 - development of new stream API for building Gstreamer pipelines
- 92886d25 - modify gstlal_ll_dq to use stream API
- 7dea369e - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
- 327cc425 - gstlal_play: convert to stream API
- d696b5c2 - stream.Stream: allow pipeline naming, head.connect(), debugging output
- a26d5d6f - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
- a870445f - gstlal_stage_frames: convert to stream API
- b18016dc - migrate mkhtgate from datasource to pipeparts.condition
- 0cb3f8c1 - gstlal_fake_frames: convert to stream API
- de012208 - stream.StreamHandler: handle edge case when message structure is undefined
- 33368998 - reference_psd.measure_psd(): convert to stream API
- e3176046 - stream.py: register elements within Stream class internally rather than on module load
- 021c44c0 - reference_psd.py: tidy up stream in measure_psd()
- 45248185 - pipeparts/condition.py: add missing args to func signatures
- dd1c5c0c - add itacac elem to pipeparts
- f9c335f1 - stream.py: add functionality to support gstlal_inspiral
- 77b7b6d3 - gstlal_ll_dq: remove nxydump layer prior to appsink
- 689298b8 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
- b2e81ea4 - simplification/additions to stream API
- 8edacd00 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
- d54435fd - Stream: add functionality to get elem by name, post bus messages. update...
- 5126ac2e - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
- d8fadad0 - stream.py: add type annotations, minor cleanup
- 73029631 - add basic stream API tests
- 07d5cfff - stream.py: add docstrings
- 92d8591c - svd_bank_snr.py: minor cleanup
- 4159a507 - stream.py: allow name kwarg in Stream.from_datasource()
- eaf3c2fa - lloidparts.py: preserve previous names for registered elements
- 33c876bd - stream.py: rename Stream.remap() -> Stream.clear() for clarity
- e5ca1efd - fix after rebase
- a9992a2b - lloidhandler.py: fix indentation issue from rebase
- 211a037e - stream.py: remove unused import
- a12ab771 - stream.py: cast memoryview to bytes as from_buffer() methods do not handle this case
Toggle commit list-
e3011955...cd067f5e - 20 commits from branch
added 1 commit
- 109b7a86 - test_element_registry.py: skip tests due to function aliases in pipeparts
added 1 commit
- 7775f634 - Update stream API with recent upstream changes
added 1 commit
- 800c66f7 - gstlal_play: remove unused imports, fix typos