Skip to content
Snippets Groups Projects
Commit 0ec63949 authored by James Kennington's avatar James Kennington
Browse files

Add pytest setup for each package

parent 47bb0b88
No related branches found
No related tags found
1 merge request!51Add pytest setup for each package
Pipeline #271230 passed
Showing
with 5308 additions and 5 deletions
...@@ -41,8 +41,10 @@ stages: ...@@ -41,8 +41,10 @@ stages:
- level2 - level2
- test-gstlal - test-gstlal
- test-gstlal-only-ugly - test-gstlal-only-ugly
- test-inspiral - test-gstlal-ugly
- test-burst - test-burst
- test-calibration
- test-inspiral
- test-offline - test-offline
- docker - docker
- docker-latest - docker-latest
...@@ -219,13 +221,44 @@ test:gstlal-inspiral: ...@@ -219,13 +221,44 @@ test:gstlal-inspiral:
- export GSTLAL_FIR_WHITEN=0 - export GSTLAL_FIR_WHITEN=0
- gst-inspect-1.0 - gst-inspect-1.0
# Get the necessary ROM data:
- git clone https://git.ligo.org/alexander.pace/gstlal-testing-data.git ${GSTLAL_DIR}/gstlal-testing-data
- export LAL_DATA_PATH=${GSTLAL_DIR}/gstlal-testing-data/
# Run doctests
- cd gstlal-inspiral - cd gstlal-inspiral
- python3 -m pytest -v --doctest-modules --ignore gst/python --ignore tests --ignore python/snglinspiraltable.py --ignore python/spawaveform.py --ignore python/imr_utils.py --ignore python/stats/inspiral_extrinsics.py --ignore python/templates.py --ignore python/inspiral_pipe.py --ignore python/p_astro_gstlal.py --ignore python/rio.py - python3 -m pytest -c pytest.ini
only:
- schedules
- pushes
allow_failure: true
test:gstlal-ugly:
interruptible: true
image: containers.ligo.org/alexander.pace/gstlal-dev/gstlal-dev:el7
stage: test-gstlal-ugly
needs:
- level0:rpm:gstlal
- level1:rpm:gstlal-ugly
script:
# Install RPMs and set up the test environment:
- if [ -d rpmbuild ]; then yum -y install rpmbuild/RPMS/x86_64/*.rpm; fi
- export GSTLAL_FIR_WHITEN=0
- gst-inspect-1.0
# Get the necessary ROM data:
- git clone https://git.ligo.org/alexander.pace/gstlal-testing-data.git ${GSTLAL_DIR}/gstlal-testing-data
- export LAL_DATA_PATH=${GSTLAL_DIR}/gstlal-testing-data/
# Run doctests
- cd gstlal-ugly
- python3 -m pytest -c pytest.ini
only: only:
- schedules - schedules
- pushes - pushes
allow_failure: true allow_failure: true
test:gstlal-burst: test:gstlal-burst:
interruptible: true interruptible: true
image: containers.ligo.org/alexander.pace/gstlal-dev/gstlal-dev:el7 image: containers.ligo.org/alexander.pace/gstlal-dev/gstlal-dev:el7
...@@ -240,7 +273,28 @@ test:gstlal-burst: ...@@ -240,7 +273,28 @@ test:gstlal-burst:
- export GSTLAL_FIR_WHITEN=0 - export GSTLAL_FIR_WHITEN=0
- gst-inspect-1.0 - gst-inspect-1.0
- cd gstlal-burst - cd gstlal-burst
- python3 -m pytest -v --doctest-modules --ignore python/excesspower --ignore python/string --ignore tests/trigger_test_01.py - python3 -m pytest -c pytest.ini
only:
- schedules
- pushes
allow_failure: true
test:gstlal-calibration:
interruptible: true
image: containers.ligo.org/alexander.pace/gstlal-dev/gstlal-dev:el7
stage: test-calibration
needs:
- level0:rpm:gstlal
- level1:rpm:gstlal-ugly
- level2:rpm:gstlal-calibration
script:
# Install RPMs and set up the test environment:
- if [ -d rpmbuild ]; then yum -y install rpmbuild/RPMS/x86_64/*.rpm; fi
- export GSTLAL_FIR_WHITEN=0
- gst-inspect-1.0
- cd gstlal-calibration
- python3 -m pytest -c pytest.ini
only: only:
- schedules - schedules
- pushes - pushes
......
# Configuration file for pytest within gstlal-burst
[pytest]
norecursedirs = gst/python share
testpaths = tests/tests_pytest python
addopts =
-v
--doctest-modules
--ignore python/excesspower
--ignore python/string
--ignore tests/test_trigger_01.py
This diff is collapsed.
"""Unittests for trigger test 01
"""
import os
from collections import deque
from io import StringIO
import gi
from gstlal import pipeparts, simplehandler
from utils import common
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
####################
#
# classes
#
####################
class MultiChannelHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs):
self.output = kwargs.pop("output")
self.instrument = kwargs.pop("instrument")
super(MultiChannelHandler, self).__init__(*args, **kwargs)
self.timedeq = deque(maxlen=10000)
def do_on_message(self, bus, message):
return False
def prehandler(self, elem):
buf = elem.emit("pull-preroll")
del buf
return Gst.FlowReturn.OK
# def bufhandler(self, elem, sink_dict):
def bufhandler(self, elem):
buf = elem.emit("pull-sample").get_buffer()
buftime = int(buf.pts / 1e9)
(result, mapinfo) = buf.map(Gst.MapFlags.READ)
assert result
if mapinfo.data:
data = StringIO.StringIO(mapinfo.data).getvalue()
# channel = sink_dict[elem]
fdata = ""
for line in data.split('\n'):
if len(line) > 0:
# fdata += "%s\t" % channel + line.rstrip('\r') + "\n"
fdata += line.rstrip('\r') + "\n"
self.timedeq.append(buftime)
else:
buf.unmap(mapinfo)
del buf
return Gst.FlowReturn.OK
# Save a "latest"
self.to_trigger_file(os.path.join(self.output, "output_triggers.txt"), fdata)
buf.unmap(mapinfo)
del buf
return Gst.FlowReturn.OK
def to_trigger_file(self, path, data):
# print path
with open(path, 'a') as f:
f.write(data)
#
# =============================================================================
#
# Pipeline Builder
#
# =============================================================================
#
def build_and_run(pipelinefunc, name, segment=None, **pipelinefunc_kwargs):
for key, value in pipelinefunc_kwargs.items():
print("{0} = {1}".format(key, value))
print("=== Running Test %s ===" % name)
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(name=name)
handler = MultiChannelHandler(mainloop, pipeline, output="./", instrument=None)
pipeline = pipelinefunc(pipeline, name, handler, **pipelinefunc_kwargs)
if segment is not None:
if pipeline.set_state(Gst.State.PAUSED) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
pipeline.seek(1.0, Gst.Format(Gst.Format.TIME), Gst.SeekFlags.FLUSH, Gst.SeekType.SET, segment[0].ns(), Gst.SeekType.SET, segment[1].ns())
# TODO: figure out why the below line causes process to crash
# if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
# raise RuntimeError("pipeline failed to enter PLAYING state")
# mainloop.run()
#
# =============================================================================
#
# Pipelines
#
# =============================================================================
#
#
# check for proper peak finding
#
def peak_test_01a(pipeline, name, handler):
#
# try changing these. test should still work!
#
initial_channels = 2
rate = 2048 # Hz
width = 32
sine_frequency = 1
gap_frequency = 0.1 # Hz
gap_threshold = 0.7 # of 1
buffer_length = 1.0 # seconds
test_duration = 10.0 # seconds
peak_window = 2048 # samples
wave = 0
#
# build pipeline
#
head = common.gapped_test_src(pipeline, buffer_length=buffer_length, rate=rate, width=width, channels=initial_channels, test_duration=test_duration,
gap_frequency=gap_frequency, gap_threshold=gap_threshold, control_dump_filename="%s_control.dump" % name)
head = pipeparts.mktogglecomplex(pipeline, head)
head = pipeparts.mktaginject(pipeline, head, "instrument=H1,channel-name=LSC-STRAIN,units=strain")
# head = common.gapped_complex_test_src(pipeline, buffer_length = buffer_length, rate = in_rate, test_duration = test_duration, wave = wave, freq = sine_frequency, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = "itac_test_01a_control.dump", tags = "instrument=H1,channel-name=LSC-STRAIN,units=strain")
# head = tee = pipeparts.mktee(pipeline, head)
# pipeparts.mktrigger(pipeline, head, peak_window, "test_bank.xml")
# Does not recieve EOS, hangs
# pipeparts.mktrigger(pipeline, head, peak_window,autocorrelation_matrix = numpy.ones((1,21), dtype=numpy.complex))
# head = pipeparts.mkqueue(pipeline, pipeparts.mkitac(pipeline, head, peak_window, "test_bank.xml", autocorrelation_matrix = numpy.array([[0+0.j, 0+0.j, 1+1.j, 0+0.j, 0+0.j]])))
# head = pipeparts.mkprogressreport(pipeline, head, "test")
#
# output the before and after
#
# pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, head), "%s_out.dump" % name)
# pipeparts.mkfakesink(pipeline, head)
# a = pipeparts.mkappsink(pipeline, pipeparts.mkqueue(pipeline, head))
head = pipeparts.mkgeneric(pipeline, head, "lal_nxydump")
sink = pipeparts.mkappsink(pipeline, head, max_buffers=1, sync=False)
sink.connect("new-sample", handler.bufhandler)
sink.connect("new-preroll", handler.prehandler)
# outfile = open("itac_test_01a_out.dump", "w")
# def dump_triggers(elem, output = outfile):
# for row in SnglInspiralTable.from_buffer(elem.emit("pull-buffer")):
# print >>outfile, row.end_time + row.end_time_ns*1e-9, row.snr, row.chisq, row.chisq_dof
# a.connect_after("new-buffer", dump_triggers)
# pipeparts.mknxydumpsink(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkqueue(pipeline, tee)), "itac_test_01a_in.dump")
#
# done
#
# if "GST_DEBUG_DUMP_DOT_DIR" in os.environ:
# gst.DEBUG_BIN_TO_DOT_FILE(pipeline, gst.DEBUG_GRAPH_SHOW_ALL, "peak_test_01a")
return pipeline
class TestTrigger01:
"""Test class group for tigger 01"""
def test_peak_01a(self):
"""Test peak"""
build_and_run(peak_test_01a, "peak_test_01a")
assert True
# Copyright (C) 2009--2011,2013 Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import numpy
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
from gstlal import pipeparts
from gstlal import pipeio
from gstlal import simplehandler
GObject.threads_init()
Gst.init(None)
if sys.byteorder == "little":
BYTE_ORDER = "LE"
else:
BYTE_ORDER = "BE"
#
# =============================================================================
#
# Utilities
#
# =============================================================================
#
def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
assert not width % 8
samplesperbuffer = int(round(buffer_length * rate))
head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = 1, blocksize = (width / 8 * 2) * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length)), is_live = is_live)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=Z%d%s, rate=%d, channels=2" % (width, BYTE_ORDER, rate))
head = pipeparts.mktogglecomplex(pipeline, head)
if verbose:
head = pipeparts.mkprogressreport(pipeline, head, "src")
return head
def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
assert not width % 8
if wave == "ligo":
head = pipeparts.mkfakeLIGOsrc(pipeline, instrument = "H1", channel_name = "LSC-STRAIN")
else:
samplesperbuffer = int(round(buffer_length * rate))
head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = 1, blocksize = (width / 8 * channels) * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length)), is_live = is_live)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=%d" % (width, BYTE_ORDER, rate, channels))
if verbose:
head = pipeparts.mkprogressreport(pipeline, head, "src")
return head
def add_gaps(pipeline, head, buffer_length, rate, test_duration, gap_frequency = None, gap_threshold = None, control_dump_filename = None):
if gap_frequency is None:
return head
samplesperbuffer = int(round(buffer_length * rate))
control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, volume = 1, blocksize = 4 * samplesperbuffer, samplesperbuffer = samplesperbuffer, num_buffers = int(round(test_duration / buffer_length))), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
if control_dump_filename is not None:
control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
control = pipeparts.mkqueue(pipeline, control)
return pipeparts.mkgate(pipeline, head, control = control, threshold = gap_threshold)
def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
if tags is not None:
src = pipeparts.mktaginject(pipeline, src, tags)
return add_gaps(pipeline, src, buffer_length = buffer_length, rate = rate, test_duration = test_duration, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = control_dump_filename)
def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
if tags is not None:
src = pipeparts.mktaginject(pipeline, src, tags)
return pipeparts.mktogglecomplex(pipeline, add_gaps(pipeline, pipeparts.mktogglecomplex(pipeline, src), buffer_length = buffer_length, rate = rate, test_duration = test_duration, gap_frequency = gap_frequency, gap_threshold = gap_threshold, control_dump_filename = control_dump_filename))
#
# =============================================================================
#
# Pipeline Builder
#
# =============================================================================
#
def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs):
print("=== Running Test %s ===" % name, file=sys.stderr)
mainloop = GObject.MainLoop()
pipeline = pipelinefunc(Gst.Pipeline(name = name), name, **pipelinefunc_kwargs)
handler = simplehandler.Handler(mainloop, pipeline)
if segment is not None:
if pipeline.set_state(Gst.State.PAUSED) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
pipeline.seek(1.0, Gst.Format(Gst.Format.TIME), Gst.SeekFlags.FLUSH, Gst.SeekType.SET, segment[0].ns(), Gst.SeekType.SET, segment[1].ns())
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
mainloop.run()
#
# =============================================================================
#
# Push Arrays Through an Element
#
# =============================================================================
#
def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs):
input_arrays = list(input_arrays) # so we can modify it
output_arrays = []
pipeline = Gst.Pipeline(name = name)
head = pipeparts.mkgeneric(pipeline, None, "appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate))
def need_data(elem, arg, input_array_rate_pair):
input_arrays, rate = input_array_rate_pair
if input_arrays:
arr = input_arrays.pop(0)
elem.set_property("caps", pipeio.caps_from_array(arr, rate))
buf = pipeio.audio_buffer_from_array(arr, 0, 0, rate)
elem.emit("push-buffer", pipeio.audio_buffer_from_array(arr, 0, 0, rate))
return Gst.FlowReturn.OK
else:
elem.emit("end-of-stream")
return Gst.FlowReturn.EOS
head.connect("need-data", need_data, (input_arrays, rate))
head = elemfunc(pipeline, head, **elemfunc_kwargs)
head = pipeparts.mkappsink(pipeline, head)
def appsink_get_array(elem, output_arrays):
output_arrays.append(pipeio.array_from_audio_sample(elem.emit("pull-sample")))
return Gst.FlowReturn.OK
head.connect("new-sample", appsink_get_array, output_arrays)
build_and_run((lambda *args, **kwargs: pipeline), name)
return output_arrays
# Configuration file for pytest within gstlal-calibration
[pytest]
norecursedirs = gst/python share tests
testpaths = tests/tests_pytest python
addopts =
-v
--doctest-modules
#!/usr/bin/env python3
# Copyright (C) 2016 Aaron Viets
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from gstlal import calibration_parts
from gstlal import pipeparts
from utils import common
#
# =============================================================================
#
# Pipelines
#
# =============================================================================
#
def pyfilter_test_01(pipeline, name):
#
# This test removes the DC component from a stream of ones (i.e., the result should be zero)
#
rate = 16384 # Hz
buffer_length = 1.0 # seconds
test_duration = 10.0 # seconds
DC = 1.0
wave = 0
freq = 90
volume = 1.0
#
# build pipeline
#
src = common.test_src(pipeline, buffer_length=buffer_length, rate=rate, freq=freq, test_duration=test_duration, wave=wave, width=64)
head = pipeparts.mkaudioamplify(pipeline, src, volume)
head = pipeparts.mkgeneric(pipeline, head, "lal_add_constant", value=DC)
head = pipeparts.mktee(pipeline, head)
pipeparts.mknxydumpsink(pipeline, head, "%s_in.txt" % name)
head = calibration_parts.bandstop(pipeline, head, rate)
pipeparts.mknxydumpsink(pipeline, head, "%s_out.txt" % name)
#
# done
#
return pipeline
class TestPyFilter:
"""PyFilter test class"""
def test_pyfilter(self):
"""Test pyfilter"""
common.build_and_run(pyfilter_test_01, "pyfilter_test_01")
# Copyright (C) 2009--2011,2013 Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import numpy
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
from gstlal import pipeparts
from gstlal import pipeio
from gstlal import simplehandler
from gstlal import datasource
GObject.threads_init()
Gst.init(None)
if sys.byteorder == "little":
BYTE_ORDER = "LE"
else:
BYTE_ORDER = "BE"
#
# =============================================================================
#
# Utilities
#
# =============================================================================
#
def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True, src_suffix = ""):
head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, samplesperbuffer = int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=2" % (width, BYTE_ORDER, rate))
head = pipeparts.mktogglecomplex(pipeline, head)
if verbose:
head = pipeparts.mkprogressreport(pipeline, head, "src%s" % src_suffix)
return head
def int_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, samplesperbuffer = int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=S%d%s, rate=%d, channels=%d" % (width, BYTE_ORDER, rate, channels))
if verbose:
head = pipeparts.mkprogressreport(pipeline, head, "src")
return head
def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, volume = 1, is_live = False, verbose = True, src_suffix = ""):
if wave == "ligo":
head = pipeparts.mkfakeLIGOsrc(pipeline, instrument = "H1", channel_name = "LSC-STRAIN")
else:
head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, volume = volume, samplesperbuffer = int(buffer_length * rate), num_buffers = int(test_duration / buffer_length), is_live = is_live)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, format=F%d%s, rate=%d, channels=%d, channel-mask=(bitmask)0x0" % (width, BYTE_ORDER, rate, channels))
if verbose:
head = pipeparts.mkprogressreport(pipeline, head, "src%s" % src_suffix)
return head
def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, volume = 0.8, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True):
src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, volume = volume, is_live = is_live, verbose = verbose)
if gap_frequency is None:
return src
control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
if control_dump_filename is not None:
control = pipeparts.mktee(pipeline, control)
pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
control = pipeparts.mkqueue(pipeline, control)
return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold)
def gapped_int_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True):
src = int_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
if gap_frequency is None:
return src
control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
if control_dump_filename is not None:
control = pipeparts.mktee(pipeline, control)
pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
control = pipeparts.mkqueue(pipeline, control)
return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold)
def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
if tags is not None:
src = pipeparts.mktaginject(pipeline, src, tags)
if gap_frequency is None:
return src
control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw, format=F32%s, rate=%d, channels=1" % (BYTE_ORDER, rate))
if control_dump_filename is not None:
control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
control = pipeparts.mkqueue(pipeline, control)
return pipeparts.mktogglecomplex(pipeline, pipeparts.mkgate(pipeline, pipeparts.mktogglecomplex(pipeline, src), control = control, threshold = gap_threshold))
#
# =============================================================================
#
# Pipeline Builder
#
# =============================================================================
#
def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs):
print("=== Running Test %s ===" % name)
mainloop = GObject.MainLoop()
pipeline = pipelinefunc(Gst.Pipeline(name = name), name, **pipelinefunc_kwargs)
handler = simplehandler.Handler(mainloop, pipeline)
if segment is not None:
if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline failed to enter READY state")
datasource.pipeline_seek_for_gps(pipeline, segment[0].ns() / 1000000000, segment[1].ns() / 1000000000)
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
# TODO: find a way to write to temp files
# pipeparts.write_dump_dot(pipeline, "test_%s" % name, verbose = True)
mainloop.run()
#
# =============================================================================
#
# Push Arrays Through an Element
#
# =============================================================================
#
def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs):
input_arrays = list(input_arrays) # so we can modify it
output_arrays = []
pipeline = Gst.Pipeline(name = name)
head = pipeparts.mkgeneric(pipeline, None, "appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate))
def need_data(elem, arg, input_arrays, rate):
if input_arrays:
arr = input_arrays.pop(0)
elem.set_property("caps", pipeio.caps_from_array(arr, rate))
buf = pipeio.audio_buffer_from_array(arr, 0, 0, rate)
elem.emit("push-buffer", pipeio.audio_buffer_from_array(arr, 0, 0, rate))
return Gst.FlowReturn.OK
else:
elem.emit("end-of-stream")
return Gst.FlowReturn.EOS
head.connect("need-data", need_data, input_arrays, rate)
head = elemfunc(pipeline, head, **elemfunc_kwargs)
head = pipeparts.mkappsink(pipeline, head)
def appsink_get_array(elem, output_arrays):
output_arrays.append(pipeio.array_from_audio_sample(elem.emit("pull-sample")))
return Gst.FlowReturn.OK
head.connect("new-sample", appsink_get_array, output_arrays)
build_and_run((lambda *args, **kwargs: pipeline), name)
return output_arrays
# Configuration file for pytest within gstlal-inspiral
[pytest]
norecursedirs = gst/python share tests
testpaths = tests/tests_pytest python
addopts =
-v
--doctest-modules
--ignore gst/python
--ignore tests
--ignore python/snglinspiraltable.py
--ignore python/spawaveform.py
--ignore python/imr_utils.py
--ignore python/stats/inspiral_extrinsics.py
--ignore python/templates.py
--ignore python/inspiral_pipe.py
--ignore python/p_astro_gstlal.py
--ignore python/rio.py
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import numpy
from gstlal import pipeparts
from gstlal.snglinspiraltable import GSTLALSnglInspiral
from utils import common
#
# =============================================================================
#
# Pipelines
#
# =============================================================================
#
#
# check for proper peak finding
#
def peak_test_01a(pipeline):
#
# try changing these. test should still work!
#
in_rate = 32 # Hz
sine_frequency = 1
gap_frequency = 0.1 # Hz
gap_threshold = 0.7 # of 1
buffer_length = 1.0 # seconds
test_duration = 10.0 # seconds
peak_window = 16 # samples
wave = 0
#
# build pipeline
#
head = common.gapped_complex_test_src(pipeline, buffer_length=buffer_length, rate=in_rate, test_duration=test_duration, wave=wave, freq=sine_frequency,
gap_frequency=gap_frequency, gap_threshold=gap_threshold, control_dump_filename="itac_test_01a_control.dump",
tags="instrument=H1,channel-name=LSC-STRAIN,units=strain")
head = tee = pipeparts.mktee(pipeline, head)
head = pipeparts.mkqueue(pipeline,
pipeparts.mkitac(pipeline, head, peak_window, "test_bank.xml", autocorrelation_matrix=numpy.array([[0 + 0.j, 0 + 0.j, 1 + 1.j, 0 + 0.j, 0 + 0.j]])))
head = pipeparts.mkprogressreport(pipeline, head, "test")
#
# output the before and after
#
a = pipeparts.mkappsink(pipeline, pipeparts.mkqueue(pipeline, head))
outfile = open("itac_test_01a_out.dump", "w")
def dump_triggers(elem, output=outfile):
for row in GSTLALSnglInspiral.from_buffer(elem.emit("pull-buffer")):
print(row.end_time + row.end_time_ns * 1e-9, row.snr, row.chisq, row.chisq_dof, file=outfile)
a.connect_after("new-buffer", dump_triggers)
pipeparts.mknxydumpsink(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkqueue(pipeline, tee)), "itac_test_01a_in.dump")
#
# done
#
# if "GST_DEBUG_DUMP_DOT_DIR" in os.environ:
# gst.DEBUG_BIN_TO_DOT_FILE(pipeline, gst.DEBUG_GRAPH_SHOW_ALL, "peak_test_01a")
return pipeline
class TestITAC:
"""Test class group for itac"""
def test_itac(self):
"""Test for itac"""
common.build_and_run(peak_test_01a, "peak_test_01a")
This diff is collapsed.
# Configuration file for pytest within gstlal-ugly
[pytest]
norecursedirs = gst/python share tests
testpaths = tests/tests_pytest python
addopts =
-v
--doctest-modules
--ignore gst/python
--ignore tests
#!/usr/bin/env python3
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
from gstlal import pipeparts
from utils import common
#
# =============================================================================
#
# Pipelines
#
# =============================================================================
#
#
# test rate faker
#
def ratefaker_test_01a(pipeline, dummy):
#
# try changing these. test should still work!
#
wave = 0 # gst-inspect audiotestsrc
freq = 25 # Hz
in_rate = 2048 # Hz
out_rate = 1024 # Hz
gap_frequency = 1.0 # Hz
gap_threshold = 0.8 # of 1
buffer_length = 1.0 # seconds
test_duration = 200.0 # seconds
#
# build pipeline
#
head = common.gapped_test_src(pipeline, buffer_length=buffer_length, rate=in_rate, test_duration=test_duration, wave=wave, freq=freq, gap_frequency=gap_frequency,
gap_threshold=gap_threshold)
head = pipeparts.mktee(pipeline, head)
pipeparts.mknxydumpsink(pipeline, head, "in.txt")
head = pipeparts.mkprogressreport(pipeline, head, "progress_src")
head = pipeparts.mkgeneric(pipeline, head, "audioratefaker")
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw-float, rate=%d" % out_rate)
pipeparts.mknxydumpsink(pipeline, head, "out.txt")
#
# done
#
return pipeline
class TestRateFaker:
"""Class to group rate faker tests"""
def test_rate_faker(self):
"""Test rate faker"""
common.build_and_run(ratefaker_test_01a, "ratefaker_test_01a")
This diff is collapsed.
...@@ -3,4 +3,5 @@ utilitiesdir = $(pkgpythondir)/utilities ...@@ -3,4 +3,5 @@ utilitiesdir = $(pkgpythondir)/utilities
utilities_PYTHON = \ utilities_PYTHON = \
__init__.py \ __init__.py \
admin.py admin.py \
testtools.py
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment