Skip to content
Snippets Groups Projects
Commit cd9f8c4f authored by James Kennington's avatar James Kennington
Browse files

Add basic pipeline tools

parent 826fc6c2
No related branches found
No related tags found
1 merge request!116Add basic pipeline tools
Pipeline #312661 passed
Showing
with 309 additions and 50 deletions
......@@ -17,7 +17,7 @@ pkgpython_PYTHON = \
datafind.py \
datasource.py \
elements.py \
gsttools.py \
gstpipetools.py \
httpinterface.py \
kernels.py \
matplotlibhelper.py \
......
"""Miscellaneous utilities for working with Gstreamer
"""Miscellaneous utilities for working with Gstreamer Pipelines
References:
[1] 1.0 API: https://lazka.github.io/pgi-docs/Gst-1.0/index.html
"""
import functools
from typing import Any, Union
from typing import Any, Union, Tuple, Iterable, Optional, Callable
import gi
......@@ -15,6 +14,12 @@ from gi.repository import Gst
GObject.threads_init()
Gst.init(None)
from lal import LIGOTimeGPS
from gstlal import simplehandler
LIVE_SEGMENT = (None, None)
Time = Union[int, float, LIGOTimeGPS]
def is_element(x: Any) -> bool:
"""Test whether an object is a Gst Element. TODO: do these belong somewhere more general?
......@@ -87,3 +92,124 @@ def make_element(type_name: str, name: str = None, **properties: dict) -> Gst.El
elem.set_property(k, v)
return elem
def make_pipeline(name: str) -> Gst.Pipeline:
"""Create a Pipeline, which is the macroscopic container for Gstreamer elements and is necessary
prior to creating any elements
Args:
name:
str, the name of the pipeline
References:
[1] https://gstreamer.freedesktop.org/documentation/application-development/introduction/basics.html?gi-language=c#bins-and-pipelines
Returns:
Pipeline
"""
return Gst.Pipeline(name)
def run_pipeline(pipeline: Gst.Pipeline, segment: Tuple[Time, Time] = LIVE_SEGMENT, handlers: Optional[Iterable[Callable]] = None):
"""Run a pipeline using the main event loop
Args:
pipeline:
Gst.Pipeline, the pipeline to run
segment:
Tuple[Time, Time], a playback segment
handlers:
Iterable[Callable], default None, an optional list of functions that return Handlers. Each must
accept only a "mainloop" and "pipeline" argument.
Returns:
None
"""
if handlers is None:
handlers = (simplehandler.Handler,)
mainloop = GObject.MainLoop()
# Set handlers
for handler in handlers:
_ = handler(mainloop=mainloop, pipeline=pipeline)
if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter ready state")
seek(pipeline, segment)
if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter playing state")
mainloop.run()
def seek(pipeline, segment: Tuple[Time, Time], flags=Gst.SeekFlags.FLUSH):
"""Create a new seek event, i.e., Gst.Event.new_seek() for a given
gps_start_time and gps_end_time, with optional flags.
Args:
pipeline:
gps_start_time:
start time as LIGOTimeGPS, float
gps_end_time:
end time as LIGOTimeGPS, float
flags:
Optional flags, see [2] for options
Notes:
In contrast to the documentation, we set the seek event directly on the pipeline sources. This is because of implementation
issues in framecpp demux that prevent the full backward propagation of the seek event from sink -> source (as is done in the
gstreamer documentation [1]).
References:
[1] https://gstreamer.freedesktop.org/documentation/additional/design/seeking.html?gi-language=python
[2] Flags https://gstreamer.freedesktop.org/documentation/gstreamer/gstsegment.html?gi-language=python#GstSeekFlags
Returns:
None
"""
start, end = segment
start_type, start_time = seek_args(start)
stop_type, stop_time = seek_args(end)
if pipeline.current_state != Gst.State.READY:
raise ValueError("pipeline must be in READY state")
pipeline.seek(rate=1.0,
format=Gst.Format(Gst.Format.TIME),
flags=flags,
start_type=start_type,
start=start_time,
stop_type=stop_type,
stop=stop_time)
for elem in pipeline.iterate_sources():
elem.seek(rate=1.0,
format=Gst.Format(Gst.Format.TIME),
flags=flags,
start_type=start_type,
start=start_time,
stop_type=stop_type,
stop=stop_time)
def seek_args(time: Time) -> Tuple[Gst.SeekType, int]:
"""Convenience function for determining the type of arguments to seek for a given time input
Args:
time:
Time, either a float or LIGOTimeGPS
Returns:
Tuple[Gst.SeekType, int]
"""
if time is None or time == -1:
return (Gst.SeekType.NONE, -1) # -1 == Gst.CLOCK_TIME_NONE
elif isinstance(time, LIGOTimeGPS):
return (Gst.SeekType.SET, time.ns())
else:
return (Gst.SeekType.SET, int(float(time) * Gst.SECOND))
......@@ -12,7 +12,7 @@ from gi.repository import Gst
GObject.threads_init()
Gst.init(None)
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools
......@@ -107,7 +107,7 @@ def caps(pipeline: pipetools.Pipeline, src: pipetools.Element, caps: Union[str,
Returns:
Element, the source element limited by the given caps (capabilities)
"""
return pipetools.make_element_with_src(pipeline, src, "capsfilter", caps=gsttools.to_caps(caps), **properties)
return pipetools.make_element_with_src(pipeline, src, "capsfilter", caps=gstpipetools.to_caps(caps), **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALWhiten.html">lal_whiten</a> element to a pipeline with useful default properties
......
......@@ -13,7 +13,7 @@ gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
from gstlal import gsttools
from gstlal import gstpipetools
GObject.threads_init()
Gst.init(None)
......@@ -80,13 +80,13 @@ def make_element_with_src(pipeline: Gst.Pipeline, src: Union[Gst.Element, Gst.Pa
properties = {clean_property_name(key): value for key, value in properties.items()}
# Make the element
elem = gsttools.make_element(elem_type_name, name, **properties)
elem = gstpipetools.make_element(elem_type_name, name, **properties)
# Add element to pipeline
pipeline.add(elem)
# Link src argument as input to element
if gsttools.is_pad(src):
if gstpipetools.is_pad(src):
src.get_parent_element().link_pads(src, elem, None)
elif src is not None:
src.link(elem)
......
"""Unittests for encoding pipeparts"""
import pytest
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import encode, pipetools
from gstlal.pipeparts.pipetools import Gst, GObject
from gstlal.utilities import testtools
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
......@@ -2,13 +2,13 @@
import numpy
import pytest
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import filters, pipetools
from gstlal.pipeparts.pipetools import Gst, GObject
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
"""Unittests for mux pipeparts"""
import pytest
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import mux, pipetools
from gstlal.pipeparts.pipetools import Gst, GObject
from gstlal.utilities import testtools
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
......@@ -3,7 +3,7 @@ import re
import pytest
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools, pipedot
from gstlal.pipeparts.pipetools import Gst, GObject
from gstlal.utilities import testtools
......@@ -44,7 +44,7 @@ else:
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
"""Unit tests for pipetools module"""
import inspect
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools
from gstlal.utilities import testtools
......@@ -31,5 +31,5 @@ class TestPipetools:
"""Test make element"""
with testtools.GstLALTestManager(with_pipeline=True) as tm:
elem = pipetools.make_element_with_src(tm.pipeline, None, 'fakesrc')
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
"""Unittests for plotting pipeparts"""
import pytest
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools, plot
from gstlal.pipeparts.pipetools import Gst, GObject
from gstlal.utilities import testtools
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
"""Unittests for sink pipeparts"""
import pytest
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools, sink
from gstlal.pipeparts.pipetools import Gst, GObject
from gstlal.utilities import testtools
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
......@@ -2,14 +2,14 @@
import pytest
from lal import LIGOTimeGPS
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools, source
from gstlal.pipeparts.pipetools import Gst, GObject
from gstlal.utilities import testtools
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
......@@ -2,14 +2,14 @@
import pytest
from lal import LIGOTimeGPS
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools, transform
from gstlal.pipeparts.pipetools import Gst
from gstlal.utilities import testtools
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
"""Unittests for trigger pipeparts"""
import pytest
from gstlal import gsttools
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools, trigger
from gstlal.pipeparts.pipetools import Gst, GObject
from gstlal.utilities import testtools
def assert_elem_props(elem, name):
assert gsttools.is_element(elem)
assert gstpipetools.is_element(elem)
assert type(elem).__name__ == name
......
"""Unit tests for pipetools module"""
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
from lal import LIGOTimeGPS
from gstlal import gstpipetools
from gstlal.pipeparts import pipetools, source, sink
from gstlal.utilities import testtools
class TestGsttools:
"""Test class group for gsttools"""
def test_is_element(self):
"""Test is_element utility"""
assert not gstpipetools.is_element(1)
with testtools.GstLALTestManager(with_pipeline=True) as tm:
elem = pipetools.make_element_with_src(tm.pipeline, None, 'fakesrc')
assert gstpipetools.is_element(elem)
def test_is_pad(self):
"""Test is_pad utility"""
assert not gstpipetools.is_element(1)
with testtools.GstLALTestManager(with_pipeline=True) as tm:
elem = pipetools.make_element_with_src(tm.pipeline, None, 'fakesrc')
assert not gstpipetools.is_pad(elem)
assert gstpipetools.is_pad(elem.pads[0])
def test_to_caps(self):
"""Test caps coercion"""
c = gstpipetools.to_caps("audio/x-raw, format=F32LE")
assert isinstance(c, Gst.Caps)
c = gstpipetools.to_caps(c)
assert isinstance(c, Gst.Caps)
def test_make_element(self):
"""Test make element"""
e = gstpipetools.make_element('fakesrc')
assert isinstance(e, Gst.Element)
def test_make_pipeline(self):
"""Test make pipeline"""
p = gstpipetools.make_pipeline('SamplePipeline')
assert isinstance(p, Gst.Pipeline)
def test_run_pipeline(self):
"""Test run pipeline"""
pipeline = gstpipetools.make_pipeline('SamplePipelineRun')
head = source.audio_test(pipeline, wave=source.AudioTestWaveform.Sine)
head = sink.fake(pipeline, head)
gstpipetools.run_pipeline(pipeline, segment=(0, 1))
def test_seek(self):
"""Test seek pipeline"""
p = gstpipetools.make_pipeline('SamplePipeline')
p.set_state(Gst.State.READY)
gstpipetools.seek(p, segment=gstpipetools.LIVE_SEGMENT)
def test_seek_args(self):
"""Test seek args"""
assert gstpipetools.seek_args(None) == (Gst.SeekType.NONE, -1)
assert gstpipetools.seek_args(LIGOTimeGPS(123456)) == (Gst.SeekType.SET, LIGOTimeGPS(123456).ns())
assert gstpipetools.seek_args(123456) == (Gst.SeekType.SET, int(float(123456) * Gst.SECOND))
"""Unit tests for pipetools module"""
from gstlal import gsttools
from gstlal.pipeparts import pipetools
from gstlal.utilities import testtools
class TestGsttools:
"""Test class group for gsttools"""
def test_is_element(self):
"""Test is_element utility"""
assert not gsttools.is_element(1)
with testtools.GstLALTestManager(with_pipeline=True) as tm:
elem = pipetools.make_element_with_src(tm.pipeline, None, 'fakesrc')
assert gsttools.is_element(elem)
def test_is_pad(self):
"""Test is_pad utility"""
assert not gsttools.is_element(1)
with testtools.GstLALTestManager(with_pipeline=True) as tm:
elem = pipetools.make_element_with_src(tm.pipeline, None, 'fakesrc')
assert not gsttools.is_pad(elem)
assert gsttools.is_pad(elem.pads[0])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment