diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000000000000000000000000000000000..6c2fc59b24dc9f978f466afae074bdb70e6d74d1 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,7 @@ +# Configuration file for pytest within gstlal-calibration +[pytest] +norecursedirs = gst/python share tests +testpaths = tests/tests_pytest python +addopts = + -v + --doctest-modules diff --git a/tests/tests_pytest/test_pyfilter.py b/tests/tests_pytest/test_pyfilter.py new file mode 100755 index 0000000000000000000000000000000000000000..1208ac9b06a51fe9ea9cfe85d5a99e2b7ceb1cc4 --- /dev/null +++ b/tests/tests_pytest/test_pyfilter.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# Copyright (C) 2016 Aaron Viets +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +from gstlal import calibration_parts +from gstlal import pipeparts +from utils import common + + +# +# ============================================================================= +# +# Pipelines +# +# ============================================================================= +# + +def pyfilter_test_01(pipeline, name): + # + # This test removes the DC component from a stream of ones (i.e., the result should be zero) + # + + rate = 16384 # Hz + buffer_length = 1.0 # seconds + test_duration = 10.0 # seconds + DC = 1.0 + wave = 0 + freq = 90 + volume = 1.0 + + # + # build pipeline + # + + src = common.test_src(pipeline, buffer_length=buffer_length, rate=rate, freq=freq, test_duration=test_duration, wave=wave, width=64) + head = pipeparts.mkaudioamplify(pipeline, src, volume) + head = pipeparts.mkgeneric(pipeline, head, "lal_add_constant", value=DC) + head = pipeparts.mktee(pipeline, head) + pipeparts.mknxydumpsink(pipeline, head, "%s_in.txt" % name) + head = calibration_parts.bandstop(pipeline, head, rate) + pipeparts.mknxydumpsink(pipeline, head, "%s_out.txt" % name) + + # + # done + # + + return pipeline + + +class TestPyFilter: + """PyFilter test class""" + + def test_pyfilter(self): + """Test pyfilter""" + common.build_and_run(pyfilter_test_01, "pyfilter_test_01") diff --git a/tests/tests_pytest/utils/__init__.py b/tests/tests_pytest/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/tests_pytest/utils/common.py b/tests/tests_pytest/utils/common.py new file mode 100644 index 0000000000000000000000000000000000000000..42a2225d5c911661aa1b0ed0e35d48d1cb2007f5 --- /dev/null +++ b/tests/tests_pytest/utils/common.py @@ -0,0 +1,185 @@ +# Copyright (C) 2009--2011,2013 Kipp Cannon +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +# +# ============================================================================= +# +# Preamble +# +# ============================================================================= +# + + +import numpy +import sys + + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import GObject +from gi.repository import Gst + + +from gstlal import pipeparts +from gstlal import pipeio +from gstlal import simplehandler +from gstlal import datasource + + +GObject.threads_init() +Gst.init(None) + + +if sys.byteorder == "little": + BYTE_ORDER = "LE" +else: + BYTE_ORDER = "BE" + + +# +# ============================================================================= +# +# Utilities +# +# ============================================================================= +# + + +def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True, src_suffix = ""): + head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, samplesperbuffer = int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live) + head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=2" % (width, BYTE_ORDER, rate)) + head = pipeparts.mktogglecomplex(pipeline, head) + if verbose: + head = pipeparts.mkprogressreport(pipeline, head, "src%s" % src_suffix) + return head + +def int_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True): + head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, samplesperbuffer = int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live) + head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=S%d%s, rate=%d, channels=%d" % (width, BYTE_ORDER, rate, channels)) + if verbose: + head = pipeparts.mkprogressreport(pipeline, head, "src") + return head + +def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, volume = 1, is_live = False, verbose = True, src_suffix = ""): + if wave == "ligo": + head = pipeparts.mkfakeLIGOsrc(pipeline, instrument = "H1", channel_name = "LSC-STRAIN") + else: + head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = volume, samplesperbuffer = int(buffer_length * rate), num_buffers = int(test_duration / buffer_length), is_live = is_live) + head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=%d, channel-mask=(bitmask)0x0" % (width, BYTE_ORDER, rate, channels)) + if verbose: + head = pipeparts.mkprogressreport(pipeline, head, "src%s" % src_suffix) + return head + + +def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, volume = 0.8, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True): + src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, volume = volume, is_live = is_live, verbose = verbose) + if gap_frequency is None: + return src + control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate)) + if control_dump_filename is not None: + control = pipeparts.mktee(pipeline, control) + pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename) + control = pipeparts.mkqueue(pipeline, control) + return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold) + +def gapped_int_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True): + src = int_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose) + if gap_frequency is None: + return src + control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate)) + if control_dump_filename is not None: + control = pipeparts.mktee(pipeline, control) + pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename) + control = pipeparts.mkqueue(pipeline, control) + return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold) + +def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True): + src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose) + if tags is not None: + src = pipeparts.mktaginject(pipeline, src, tags) + if gap_frequency is None: + return src + control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate)) + if control_dump_filename is not None: + control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename) + control = pipeparts.mkqueue(pipeline, control) + return pipeparts.mktogglecomplex(pipeline, pipeparts.mkgate(pipeline, pipeparts.mktogglecomplex(pipeline, src), control = control, threshold = gap_threshold)) + + +# +# ============================================================================= +# +# Pipeline Builder +# +# ============================================================================= +# + + +def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs): + print("=== Running Test %s ===" % name) + mainloop = GObject.MainLoop() + pipeline = pipelinefunc(Gst.Pipeline(name = name), name, **pipelinefunc_kwargs) + handler = simplehandler.Handler(mainloop, pipeline) + if segment is not None: + if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS: + raise RuntimeError("pipeline failed to enter READY state") + datasource.pipeline_seek_for_gps(pipeline, segment[0].ns() / 1000000000, segment[1].ns() / 1000000000) + if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE: + raise RuntimeError("pipeline failed to enter PLAYING state") + # TODO: find a way to write to temp files + # pipeparts.write_dump_dot(pipeline, "test_%s" % name, verbose = True) + mainloop.run() + + +# +# ============================================================================= +# +# Push Arrays Through an Element +# +# ============================================================================= +# + + +def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs): + input_arrays = list(input_arrays) # so we can modify it + output_arrays = [] + + pipeline = Gst.Pipeline(name = name) + + head = pipeparts.mkgeneric(pipeline, None, "appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate)) + def need_data(elem, arg, input_arrays, rate): + if input_arrays: + arr = input_arrays.pop(0) + elem.set_property("caps", pipeio.caps_from_array(arr, rate)) + buf = pipeio.audio_buffer_from_array(arr, 0, 0, rate) + elem.emit("push-buffer", pipeio.audio_buffer_from_array(arr, 0, 0, rate)) + return Gst.FlowReturn.OK + else: + elem.emit("end-of-stream") + return Gst.FlowReturn.EOS + head.connect("need-data", need_data, input_arrays, rate) + + head = elemfunc(pipeline, head, **elemfunc_kwargs) + + head = pipeparts.mkappsink(pipeline, head) + def appsink_get_array(elem, output_arrays): + output_arrays.append(pipeio.array_from_audio_sample(elem.emit("pull-sample"))) + return Gst.FlowReturn.OK + + head.connect("new-sample", appsink_get_array, output_arrays) + build_and_run((lambda *args, **kwargs: pipeline), name) + + return output_arrays