Skip to content
Snippets Groups Projects

Add high-level Stream API to build GStreamer pipelines

Merged Patrick Godwin requested to merge stream_api into master

This MR adds a high level Stream API to simplify the creation of building GStreamer pipelines within GstLAL. The two main drivers are to:

  • Avoid excessive boilerplate
  • Avoid exposing technical details of GStreamer's machinery to create pipelines

For this, a new module gstlal.stream has been created with a Stream class which:

  • Manages the main event loop, pipeline and handlers in one central location
  • Allows pipeparts-style elements to be registered as Stream methods, allowing chaining
  • Folds in common boilerplate typically used when creating a pipeline:
    • Gst imports and init
    • Start mainloop, pipeline instances
    • Handles state management, seeking
  • Adds a wrapper around appsink, AppSync allowing registering callbacks when new buffers arrive via stream.bufsink

This also converts all existing pipelines in gstlal, gstlal-ugly and gstlal-inspiral.

Example (gstlal_ll_dq):

tracker = NoiseTracker(options.out_path, instrument, agg_sink)

stream = Stream.from_datasource(gw_data_source_info, instrument, verbose=options.verbose)
stream.add_callback(MessageType.ELEMENT, "spectrum", tracker.on_spectrum_message)

stream.resample(quality=9) \
    .capsfilter("audio/x-raw, rate=%d" % options.sample_rate) \
    .queue(max_size_buffers=8) \
    .whiten(fft_length=options.psd_fft_length, expand_gaps=True) \
    .queue() \
    .reblock() \
    .bufsink(tracker.on_buffer)

stream.start()

Other changes as part of this:

  • multichannel_datasource.py had its main function, mkwhitened_multirate_src, split off into two parts, mkcondition and mkmultiband since multiple locations only used the previous function with a single rate defeating the purpose of the multibanding feature (e.g. gstlal_fake_frames). The module was moved to pipeparts/condition.py but that was a personal choice I am happy to change if needed.
  • mkhtgate in datasource.py was also moved to pipeparts.condition as well.
  • itacac element was also added in pipeparts as part of this as this didn't exist previously and was needed to allow creation of this element as part of the stream API.
    • gstlal_fake_frames: If the output directory does not exist, create it instead of silently failing.
Edited by Patrick Godwin

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
  • Patrick Godwin changed the description

    changed the description

  • requested review from @james.kennington

  • Patrick Godwin added 41 commits

    added 41 commits

    • cf6e4bbb...4972a321 - 14 commits from branch master
    • 1c5d330e - development of new stream API for building Gstreamer pipelines
    • 68a58cee - modify gstlal_ll_dq to use stream API
    • 8df29345 - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • 9f53cd90 - gstlal_play: convert to stream API
    • c48064df - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • b7fb9806 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • 0c0a7190 - gstlal_stage_frames: convert to stream API
    • abf11987 - migrate mkhtgate from datasource to pipeparts.condition
    • 72381b16 - gstlal_fake_frames: convert to stream API
    • e79f7420 - stream.StreamHandler: handle edge case when message structure is undefined
    • ea00fc7a - reference_psd.measure_psd(): convert to stream API
    • b6dda0ff - stream.py: register elements within Stream class internally rather than on module load
    • 1f9b93b4 - reference_psd.py: tidy up stream in measure_psd()
    • 992f4a5b - pipeparts/condition.py: add missing args to func signatures
    • e133af7d - add itacac elem to pipeparts
    • 1a9a6a68 - stream.py: add functionality to support gstlal_inspiral
    • ee54e18e - gstlal_ll_dq: remove nxydump layer prior to appsink
    • bd37fa33 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • 379c1ee7 - simplification/additions to stream API
    • fe849355 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • 5b949042 - Stream: add functionality to get elem by name, post bus messages. update...
    • 64c08644 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • 9333a61a - stream.py: add type annotations, minor cleanup
    • 270e147a - add basic stream API tests
    • 94c2b098 - stream.py: add docstrings
    • a8bdeb2f - reference_psd.py: remove unused PSDHandler
    • 762d8688 - svd_bank_snr.py: minor cleanup

    Compare with previous version

  • Patrick Godwin added 47 commits

    added 47 commits

    • 762d8688...d3d546c8 - 20 commits from branch master
    • 2cd102ea - development of new stream API for building Gstreamer pipelines
    • b0bcc332 - modify gstlal_ll_dq to use stream API
    • 210fd1fa - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • eda0e19c - gstlal_play: convert to stream API
    • c1f8fc64 - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • 90bfbc43 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • 685af7ea - gstlal_stage_frames: convert to stream API
    • 0102ac6a - migrate mkhtgate from datasource to pipeparts.condition
    • 20335bdc - gstlal_fake_frames: convert to stream API
    • 3d0440de - stream.StreamHandler: handle edge case when message structure is undefined
    • 12913872 - reference_psd.measure_psd(): convert to stream API
    • 72face26 - stream.py: register elements within Stream class internally rather than on module load
    • 6ac5bc64 - reference_psd.py: tidy up stream in measure_psd()
    • 98f7911d - pipeparts/condition.py: add missing args to func signatures
    • 70b197d9 - add itacac elem to pipeparts
    • fbf202f9 - stream.py: add functionality to support gstlal_inspiral
    • b0d7c7dc - gstlal_ll_dq: remove nxydump layer prior to appsink
    • 3d6a42ec - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • a73900a9 - simplification/additions to stream API
    • 9ca7793b - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • 32670f5f - Stream: add functionality to get elem by name, post bus messages. update...
    • ef77a881 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • ec8d5e33 - stream.py: add type annotations, minor cleanup
    • fc9c49f8 - add basic stream API tests
    • 656d6073 - stream.py: add docstrings
    • 5c22f384 - reference_psd.py: remove unused PSDHandler
    • df83cd7d - svd_bank_snr.py: minor cleanup

    Compare with previous version

  • Patrick Godwin added 59 commits

    added 59 commits

    • df83cd7d...2c9ff3c2 - 32 commits from branch master
    • 5cb49327 - development of new stream API for building Gstreamer pipelines
    • 2daf17e3 - modify gstlal_ll_dq to use stream API
    • c3374c0f - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • 38f5adef - gstlal_play: convert to stream API
    • 89e08075 - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • e5e5c60f - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • 38d66f95 - gstlal_stage_frames: convert to stream API
    • 57f7c2f2 - migrate mkhtgate from datasource to pipeparts.condition
    • 5840c8bc - gstlal_fake_frames: convert to stream API
    • a422aaa3 - stream.StreamHandler: handle edge case when message structure is undefined
    • 20c36b2c - reference_psd.measure_psd(): convert to stream API
    • e6f4a3a1 - stream.py: register elements within Stream class internally rather than on module load
    • a09591d3 - reference_psd.py: tidy up stream in measure_psd()
    • bcd4f8b8 - pipeparts/condition.py: add missing args to func signatures
    • 08a9423c - add itacac elem to pipeparts
    • 00879d28 - stream.py: add functionality to support gstlal_inspiral
    • 4c2e8560 - gstlal_ll_dq: remove nxydump layer prior to appsink
    • 44a11604 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • 97990687 - simplification/additions to stream API
    • af651170 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • 3b15cc6d - Stream: add functionality to get elem by name, post bus messages. update...
    • 95cd9a12 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • 8cefafbb - stream.py: add type annotations, minor cleanup
    • 3eaa4f95 - add basic stream API tests
    • bd6a7318 - stream.py: add docstrings
    • 5542aef6 - reference_psd.py: remove unused PSDHandler
    • 68468147 - svd_bank_snr.py: minor cleanup

    Compare with previous version

  • Patrick Godwin added 35 commits

    added 35 commits

    • 68468147...64852648 - 8 commits from branch master
    • 9334d448 - development of new stream API for building Gstreamer pipelines
    • ad446906 - modify gstlal_ll_dq to use stream API
    • d4bda00e - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • 1214ca6d - gstlal_play: convert to stream API
    • 972c99a0 - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • 987b1f75 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • fd7ed396 - gstlal_stage_frames: convert to stream API
    • 01f1c2a7 - migrate mkhtgate from datasource to pipeparts.condition
    • dc61ca15 - gstlal_fake_frames: convert to stream API
    • 81a64604 - stream.StreamHandler: handle edge case when message structure is undefined
    • 4e432a6c - reference_psd.measure_psd(): convert to stream API
    • 5fd57e43 - stream.py: register elements within Stream class internally rather than on module load
    • f1d4d0d8 - reference_psd.py: tidy up stream in measure_psd()
    • e2b4ac35 - pipeparts/condition.py: add missing args to func signatures
    • 64a94841 - add itacac elem to pipeparts
    • 30812ef1 - stream.py: add functionality to support gstlal_inspiral
    • c05ee00c - gstlal_ll_dq: remove nxydump layer prior to appsink
    • 9e245176 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • 78f64cf1 - simplification/additions to stream API
    • 6935f93e - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • 816d552d - Stream: add functionality to get elem by name, post bus messages. update...
    • c55afee6 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • 80aba599 - stream.py: add type annotations, minor cleanup
    • 78b4ef5f - add basic stream API tests
    • 389d7b2f - stream.py: add docstrings
    • deafc0d7 - svd_bank_snr.py: minor cleanup
    • 6ed52ca4 - stream.py: allow name kwarg in Stream.from_datasource()

    Compare with previous version

  • Patrick Godwin added 34 commits

    added 34 commits

    • 6ed52ca4...d90a5aa3 - 5 commits from branch master
    • 44fae91d - development of new stream API for building Gstreamer pipelines
    • 870b60bb - modify gstlal_ll_dq to use stream API
    • 45286676 - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • 98d149b3 - gstlal_play: convert to stream API
    • ff7f2e2f - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • ed6d4e9e - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • 6817328d - gstlal_stage_frames: convert to stream API
    • c818a95d - migrate mkhtgate from datasource to pipeparts.condition
    • 07681407 - gstlal_fake_frames: convert to stream API
    • 288e2142 - stream.StreamHandler: handle edge case when message structure is undefined
    • 107dab4c - reference_psd.measure_psd(): convert to stream API
    • 66c55b89 - stream.py: register elements within Stream class internally rather than on module load
    • c99d82c2 - reference_psd.py: tidy up stream in measure_psd()
    • 9c1f5de4 - pipeparts/condition.py: add missing args to func signatures
    • 36b22f14 - add itacac elem to pipeparts
    • 836174f3 - stream.py: add functionality to support gstlal_inspiral
    • d2e762de - gstlal_ll_dq: remove nxydump layer prior to appsink
    • 002a520e - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • f8b64c61 - simplification/additions to stream API
    • bedeeee8 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • 2c9c56c9 - Stream: add functionality to get elem by name, post bus messages. update...
    • 850ba8b4 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • d960fd06 - stream.py: add type annotations, minor cleanup
    • 92835950 - add basic stream API tests
    • fa1cb10e - stream.py: add docstrings
    • 61ede589 - svd_bank_snr.py: minor cleanup
    • 62bb759c - stream.py: allow name kwarg in Stream.from_datasource()
    • 1afeb05d - lloidparts.py: preserve previous names for registered elements
    • d229b229 - stream.py: rename Stream.remap() -> Stream.clear() for clarity

    Compare with previous version

  • Patrick Godwin added 1 commit

    added 1 commit

    Compare with previous version

  • Patrick Godwin added 43 commits

    added 43 commits

    • 30d600ba...b6020f6c - 13 commits from branch master
    • 8a32ee1e - development of new stream API for building Gstreamer pipelines
    • c69c03f6 - modify gstlal_ll_dq to use stream API
    • 7e6ac352 - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • 5f8ce159 - gstlal_play: convert to stream API
    • ede7cfed - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • 46407847 - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • 069c92c0 - gstlal_stage_frames: convert to stream API
    • b8a46d12 - migrate mkhtgate from datasource to pipeparts.condition
    • a0a6a48c - gstlal_fake_frames: convert to stream API
    • 19b31525 - stream.StreamHandler: handle edge case when message structure is undefined
    • 761cdd0a - reference_psd.measure_psd(): convert to stream API
    • 17932f4e - stream.py: register elements within Stream class internally rather than on module load
    • a8de5ce4 - reference_psd.py: tidy up stream in measure_psd()
    • 1acd71fe - pipeparts/condition.py: add missing args to func signatures
    • 73bfbe27 - add itacac elem to pipeparts
    • 5ef425c0 - stream.py: add functionality to support gstlal_inspiral
    • 2fdea40d - gstlal_ll_dq: remove nxydump layer prior to appsink
    • bcd19931 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • 688534fb - simplification/additions to stream API
    • 202f0211 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • f997ee2a - Stream: add functionality to get elem by name, post bus messages. update...
    • 51c48a32 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • 244d621f - stream.py: add type annotations, minor cleanup
    • 43506e3a - add basic stream API tests
    • 9365427e - stream.py: add docstrings
    • d26493b1 - svd_bank_snr.py: minor cleanup
    • 4cdb3acc - stream.py: allow name kwarg in Stream.from_datasource()
    • 7deb1744 - lloidparts.py: preserve previous names for registered elements
    • fdf0b562 - stream.py: rename Stream.remap() -> Stream.clear() for clarity
    • df57c045 - Make stream api inspectable

    Compare with previous version

  • Patrick Godwin added 1 commit

    added 1 commit

    • dbc43f7e - lloidhandler.py: fix indentation issue from rebase

    Compare with previous version

  • Patrick Godwin added 56 commits

    added 56 commits

    • dbc43f7e...c0ece453 - 23 commits from branch master
    • 13157ef1 - development of new stream API for building Gstreamer pipelines
    • 16bf56b8 - modify gstlal_ll_dq to use stream API
    • c06e670c - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • af4eab3a - gstlal_play: convert to stream API
    • 0fe46ed5 - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • 35d6a01a - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • 195fa352 - gstlal_stage_frames: convert to stream API
    • 2591adde - migrate mkhtgate from datasource to pipeparts.condition
    • e92341bf - gstlal_fake_frames: convert to stream API
    • c8eea019 - stream.StreamHandler: handle edge case when message structure is undefined
    • 898d412d - reference_psd.measure_psd(): convert to stream API
    • e970f17a - stream.py: register elements within Stream class internally rather than on module load
    • 16dc5cad - reference_psd.py: tidy up stream in measure_psd()
    • a25bd513 - pipeparts/condition.py: add missing args to func signatures
    • 7beb6f72 - add itacac elem to pipeparts
    • 653546e9 - stream.py: add functionality to support gstlal_inspiral
    • e2128d22 - gstlal_ll_dq: remove nxydump layer prior to appsink
    • 50297982 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • 8ea949f2 - simplification/additions to stream API
    • fae2d174 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • 5fb9ae3c - Stream: add functionality to get elem by name, post bus messages. update...
    • d2752752 - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • d1ab865b - stream.py: add type annotations, minor cleanup
    • 619f5bc8 - add basic stream API tests
    • c2e109e3 - stream.py: add docstrings
    • 33e6d70c - svd_bank_snr.py: minor cleanup
    • c39cec4c - stream.py: allow name kwarg in Stream.from_datasource()
    • a0fd8ea5 - lloidparts.py: preserve previous names for registered elements
    • ced9f1a8 - stream.py: rename Stream.remap() -> Stream.clear() for clarity
    • 1f616c4a - Make stream api inspectable
    • bf0ee9d7 - lloidhandler.py: fix indentation issue from rebase
    • 66971716 - stream.py: remove unused import
    • e3011955 - stream.py: cast memoryview to bytes as from_buffer() methods do not handle this case

    Compare with previous version

  • A couple of unrelated additions to this merge request involve:

    • gstlal_fake_frames: If the output directory does not exist, create it instead of silently failing.
    • multichannel_datasource.py had its main function, mkwhitened_multirate_src, split off into two parts, mkcondition and mkmultiband since multiple locations only used the previous function with a single rate defeating the purpose of the multibanding feature (e.g. gstlal_fake_frames). The module was moved to pipeparts/condition.py but that was a personal choice I am happy to change if needed.
  • Patrick Godwin added 53 commits

    added 53 commits

    • e3011955...cd067f5e - 20 commits from branch master
    • 78ce1775 - development of new stream API for building Gstreamer pipelines
    • 92886d25 - modify gstlal_ll_dq to use stream API
    • 7dea369e - migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()
    • 327cc425 - gstlal_play: convert to stream API
    • d696b5c2 - stream.Stream: allow pipeline naming, head.connect(), debugging output
    • a26d5d6f - pipeparts/__init__.py: add convenience func for stream API to leverage framecppchannelmux
    • a870445f - gstlal_stage_frames: convert to stream API
    • b18016dc - migrate mkhtgate from datasource to pipeparts.condition
    • 0cb3f8c1 - gstlal_fake_frames: convert to stream API
    • de012208 - stream.StreamHandler: handle edge case when message structure is undefined
    • 33368998 - reference_psd.measure_psd(): convert to stream API
    • e3176046 - stream.py: register elements within Stream class internally rather than on module load
    • 021c44c0 - reference_psd.py: tidy up stream in measure_psd()
    • 45248185 - pipeparts/condition.py: add missing args to func signatures
    • dd1c5c0c - add itacac elem to pipeparts
    • f9c335f1 - stream.py: add functionality to support gstlal_inspiral
    • 77b7b6d3 - gstlal_ll_dq: remove nxydump layer prior to appsink
    • 689298b8 - gstlal_inspiral, lloidparts.py, lloidhandler.py: port to stream API, general cleanup
    • b2e81ea4 - simplification/additions to stream API
    • 8edacd00 - gstlal_inspiral_calc_snr, svd_bank_snr.py: convert to Stream API, python3 fixes
    • d54435fd - Stream: add functionality to get elem by name, post bus messages. update...
    • 5126ac2e - stream.py: replace GObject.MainLoop() -> GLib.MainLoop(), remove...
    • d8fadad0 - stream.py: add type annotations, minor cleanup
    • 73029631 - add basic stream API tests
    • 07d5cfff - stream.py: add docstrings
    • 92d8591c - svd_bank_snr.py: minor cleanup
    • 4159a507 - stream.py: allow name kwarg in Stream.from_datasource()
    • eaf3c2fa - lloidparts.py: preserve previous names for registered elements
    • 33c876bd - stream.py: rename Stream.remap() -> Stream.clear() for clarity
    • e5ca1efd - fix after rebase
    • a9992a2b - lloidhandler.py: fix indentation issue from rebase
    • 211a037e - stream.py: remove unused import
    • a12ab771 - stream.py: cast memoryview to bytes as from_buffer() methods do not handle this case

    Compare with previous version

  • Patrick Godwin added 1 commit

    added 1 commit

    • 8f34b752 - fix itacac elem name from rebase

    Compare with previous version

  • Patrick Godwin added 1 commit

    added 1 commit

    • 0320368d - fix elements from pipeparts refactor

    Compare with previous version

  • Patrick Godwin added 1 commit

    added 1 commit

    • 109b7a86 - test_element_registry.py: skip tests due to function aliases in pipeparts

    Compare with previous version

  • Patrick Godwin added 1 commit

    added 1 commit

    • 7775f634 - Update stream API with recent upstream changes

    Compare with previous version

  • Patrick Godwin changed the description

    changed the description

  • Patrick Godwin added 1 commit

    added 1 commit

    • 800c66f7 - gstlal_play: remove unused imports, fix typos

    Compare with previous version

  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
Please register or sign in to reply
Loading