From 20446844e66ca474bd94bbe6d282b162fba1dc2e Mon Sep 17 00:00:00 2001
From: James Kennington <>
Date: Thu, 12 Aug 2021 13:48:35 +0000
Subject: [PATCH] Add pytest setup for each package

 pytest.ini                           |   7 +
 tests/tests_pytest/  |  69 ++++++++++
 tests/tests_pytest/utils/ |   0
 tests/tests_pytest/utils/   | 185 +++++++++++++++++++++++++++
 4 files changed, 261 insertions(+)
 create mode 100644 pytest.ini
 create mode 100755 tests/tests_pytest/
 create mode 100644 tests/tests_pytest/utils/
 create mode 100644 tests/tests_pytest/utils/

diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 000000000..6c2fc59b2
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,7 @@
+# Configuration file for pytest within gstlal-calibration
+norecursedirs = gst/python share tests
+testpaths = tests/tests_pytest python
+addopts =
+    -v
+    --doctest-modules
diff --git a/tests/tests_pytest/ b/tests/tests_pytest/
new file mode 100755
index 000000000..1208ac9b0
--- /dev/null
+++ b/tests/tests_pytest/
@@ -0,0 +1,69 @@
+#!/usr/bin/env python3
+# Copyright (C) 2016  Aaron Viets
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+from gstlal import calibration_parts
+from gstlal import pipeparts
+from utils import common
+# =============================================================================
+#				  Pipelines
+# =============================================================================
+def pyfilter_test_01(pipeline, name):
+	#
+	# This test removes the DC component from a stream of ones (i.e., the result should be zero)
+	#
+	rate = 16384  # Hz
+	buffer_length = 1.0  # seconds
+	test_duration = 10.0  # seconds
+	DC = 1.0
+	wave = 0
+	freq = 90
+	volume = 1.0
+	#
+	# build pipeline
+	#
+	src = common.test_src(pipeline, buffer_length=buffer_length, rate=rate, freq=freq, test_duration=test_duration, wave=wave, width=64)
+	head = pipeparts.mkaudioamplify(pipeline, src, volume)
+	head = pipeparts.mkgeneric(pipeline, head, "lal_add_constant", value=DC)
+	head = pipeparts.mktee(pipeline, head)
+	pipeparts.mknxydumpsink(pipeline, head, "%s_in.txt" % name)
+	head = calibration_parts.bandstop(pipeline, head, rate)
+	pipeparts.mknxydumpsink(pipeline, head, "%s_out.txt" % name)
+	#
+	# done
+	#
+	return pipeline
+class TestPyFilter:
+	"""PyFilter test class"""
+	def test_pyfilter(self):
+		"""Test pyfilter"""
+		common.build_and_run(pyfilter_test_01, "pyfilter_test_01")
diff --git a/tests/tests_pytest/utils/ b/tests/tests_pytest/utils/
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/tests_pytest/utils/ b/tests/tests_pytest/utils/
new file mode 100644
index 000000000..42a2225d5
--- /dev/null
+++ b/tests/tests_pytest/utils/
@@ -0,0 +1,185 @@
+# Copyright (C) 2009--2011,2013  Kipp Cannon
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+# =============================================================================
+#				   Preamble
+# =============================================================================
+import numpy
+import sys
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import GObject
+from gi.repository import Gst
+from gstlal import pipeparts
+from gstlal import pipeio
+from gstlal import simplehandler
+from gstlal import datasource
+if sys.byteorder == "little":
+# =============================================================================
+#				  Utilities
+# =============================================================================
+def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True, src_suffix = ""):
+	head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, samplesperbuffer = int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
+	head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=2" % (width, BYTE_ORDER, rate))
+	head = pipeparts.mktogglecomplex(pipeline, head)
+	if verbose:
+		head = pipeparts.mkprogressreport(pipeline, head, "src%s" % src_suffix)
+	return head
+def int_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
+	head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, samplesperbuffer = int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
+	head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=S%d%s, rate=%d, channels=%d" % (width, BYTE_ORDER, rate, channels))
+	if verbose:
+		head = pipeparts.mkprogressreport(pipeline, head, "src")
+	return head
+def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, volume = 1, is_live = False, verbose = True, src_suffix = ""):
+	if wave == "ligo":
+		head = pipeparts.mkfakeLIGOsrc(pipeline, instrument = "H1", channel_name = "LSC-STRAIN")
+	else:
+		head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = volume, samplesperbuffer = int(buffer_length * rate), num_buffers = int(test_duration / buffer_length), is_live = is_live)
+		head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=%d, channel-mask=(bitmask)0x0" % (width, BYTE_ORDER, rate, channels))
+	if verbose:
+		head = pipeparts.mkprogressreport(pipeline, head, "src%s" % src_suffix)
+	return head
+def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, volume = 0.8, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True):
+	src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, volume = volume, is_live = is_live, verbose = verbose)
+	if gap_frequency is None:
+		return src
+	control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
+	if control_dump_filename is not None:
+		control = pipeparts.mktee(pipeline, control)
+		pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
+		control = pipeparts.mkqueue(pipeline, control)
+	return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold)
+def gapped_int_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True):
+	src = int_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
+	if gap_frequency is None:
+		return src
+	control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
+	if control_dump_filename is not None:
+		control = pipeparts.mktee(pipeline, control)
+		pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
+		control = pipeparts.mkqueue(pipeline, control)
+	return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold)
+def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
+	src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
+	if tags is not None:
+		src = pipeparts.mktaginject(pipeline, src, tags)
+	if gap_frequency is None:
+		return src
+	control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
+	if control_dump_filename is not None:
+		control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
+		control = pipeparts.mkqueue(pipeline, control)
+	return pipeparts.mktogglecomplex(pipeline, pipeparts.mkgate(pipeline, pipeparts.mktogglecomplex(pipeline, src), control = control, threshold = gap_threshold))
+# =============================================================================
+#			       Pipeline Builder
+# =============================================================================
+def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs):
+	print("=== Running Test %s ===" % name)
+	mainloop = GObject.MainLoop()
+	pipeline = pipelinefunc(Gst.Pipeline(name = name), name, **pipelinefunc_kwargs)
+	handler = simplehandler.Handler(mainloop, pipeline)
+	if segment is not None:
+		if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
+			raise RuntimeError("pipeline failed to enter READY state")
+		datasource.pipeline_seek_for_gps(pipeline, segment[0].ns() / 1000000000, segment[1].ns() / 1000000000)
+	if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
+		raise RuntimeError("pipeline failed to enter PLAYING state")
+	# TODO: find a way to write to temp files
+	# pipeparts.write_dump_dot(pipeline, "test_%s" % name, verbose = True)
+# =============================================================================
+#			Push Arrays Through an Element
+# =============================================================================
+def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs):
+	input_arrays = list(input_arrays)	# so we can modify it
+	output_arrays = []
+	pipeline = Gst.Pipeline(name = name)
+	head = pipeparts.mkgeneric(pipeline, None, "appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate))
+	def need_data(elem, arg, input_arrays, rate):
+		if input_arrays:
+			arr = input_arrays.pop(0)
+			elem.set_property("caps", pipeio.caps_from_array(arr, rate))
+			buf = pipeio.audio_buffer_from_array(arr, 0, 0, rate)
+			elem.emit("push-buffer", pipeio.audio_buffer_from_array(arr, 0, 0, rate))
+			return Gst.FlowReturn.OK
+		else:
+			elem.emit("end-of-stream")
+			return Gst.FlowReturn.EOS
+	head.connect("need-data", need_data, input_arrays, rate)
+	head = elemfunc(pipeline, head, **elemfunc_kwargs)
+	head = pipeparts.mkappsink(pipeline, head)
+	def appsink_get_array(elem, output_arrays):
+		output_arrays.append(pipeio.array_from_audio_sample(elem.emit("pull-sample")))
+		return Gst.FlowReturn.OK
+	head.connect("new-sample", appsink_get_array, output_arrays)
+	build_and_run((lambda *args, **kwargs: pipeline), name)
+	return output_arrays