Skip to content
Snippets Groups Projects
Commit 3ae34c5e authored by Patrick Godwin's avatar Patrick Godwin Committed by ChiWai Chan
Browse files

migrate multirate_datasource.py to pipeparts/condition.py, split into condition() and multiband()

parent b80828dc
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !114. Comments created here will be created in the context of that merge request.
......@@ -22,7 +22,6 @@ pkgpython_PYTHON = \
kernels.py \
matplotlibhelper.py \
misc.py \
multirate_datasource.py \
pipeio.py \
pipeline.py \
pipeutil.py \
......
......@@ -3,6 +3,7 @@ pipepartsdir = $(pkgpythondir)/pipeparts
pipeparts_PYTHON = \
__init__.py \
condition.py \
encode.py \
filters.py \
mux.py \
......
......@@ -32,9 +32,7 @@ import numpy
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GstAudio
GObject.threads_init()
Gst.init(None)
from gi.repository import GObject, Gst
import lal
from ligo import segments
......@@ -42,6 +40,7 @@ from ligo import segments
from gstlal import bottle
from gstlal import kernels
from gstlal import pipeparts
from gstlal import plugins
from gstlal import datasource
from gstlal.psd import interpolate_psd
......@@ -67,13 +66,13 @@ except KeyError as e:
raise
def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_fft_length = 32, ht_gate_threshold = float("inf"), veto_segments = None, nxydump_segment = None, track_psd = False, block_duration = 1 * Gst.SECOND, zero_pad = None, width = 64, unit_normalize = True, statevector = None, dqvector = None, fir_whiten_reference_psd = None, track_latency = False):
def mkcondition(pipeline, src, target_rate, instrument, psd = None, psd_fft_length = 32, ht_gate_threshold = float("inf"), veto_segments = None, nxydump_segment = None, track_psd = False, block_duration = 1 * Gst.SECOND, zero_pad = None, width = 64, statevector = None, dqvector = None, fir_whiten_reference_psd = None):
"""
Build pipeline stage to whiten and downsample h(t).
* pipeline: the gstreamer pipeline to add this to
* src: the gstreamer element that will be providing data to this
* rates: a list of the requested sample rates, e.g., [512,1024].
* target_rate: the requested sample rate.
* instrument: the instrument to process
* psd: a psd frequency series
* psd_fft_length: length of fft used for whitening
......@@ -184,10 +183,10 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
# sample rate of your data source.
#
head = pipeparts.mkcapsfilter(pipeline, src, "audio/x-raw, rate=[%d,MAX]" % max(rates))
head = pipeparts.mkcapsfilter(pipeline, src, f"audio/x-raw, rate=[{target_rate:d},MAX]")
head = pipeparts.mkinterpolator(pipeline, head)
head = pipeparts.mkaudioconvert(pipeline, head)
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_hoft" % (instrument, max(rates)))
head = pipeparts.mkchecktimestamps(pipeline, head, f"{instrument}_timestamps_{target_rate:d}_hoft")
#
# construct whitener.
......@@ -200,12 +199,12 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
# high pass filter
kernel = kernels.one_second_highpass_kernel(max(rates), cutoff = 12)
block_stride = block_duration * max(rates) // Gst.SECOND
block_stride = block_duration * target_rate // Gst.SECOND
assert len(kernel) % 2 == 1, "high-pass filter length is not odd"
head = pipeparts.mkfirbank(pipeline, head, fir_matrix = numpy.array(kernel, ndmin = 2), block_stride = block_stride, time_domain = False, latency = (len(kernel) - 1) // 2)
# FIR filter for whitening kernel
head = pipeparts.mktdwhiten(pipeline, head, kernel = numpy.zeros(1 + max(rates) * psd_fft_length, dtype=numpy.float64), latency = 0)
head = pipeparts.mktdwhiten(pipeline, head, kernel = numpy.zeros(1 + target_rate * psd_fft_length, dtype=numpy.float64), latency = 0)
# compute whitening kernel from PSD
def set_fir_psd(whiten, pspec, firelem, psd_fir_kernel):
......@@ -233,9 +232,9 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
# confirm that the reference phase PSD's Nyquist is
# sufficiently high, then reduce it to the required
# Nyquist if needed.
assert (psd_fft_length * max(rates)) // 2 + 1 <= len(fir_whiten_reference_psd.data.data), "fir_whiten_reference_psd Nyquist too low"
if (psd_fft_length * max(rates)) // 2 + 1 < len(fir_whiten_reference_psd.data.data):
fir_whiten_reference_psd = lal.CutREAL8FrequencySeries(fir_whiten_reference_psd, 0, (psd_fft_length * max(rates)) // 2 + 1)
assert (psd_fft_length * target_rate) // 2 + 1 <= len(fir_whiten_reference_psd.data.data), "fir_whiten_reference_psd Nyquist too low"
if (psd_fft_length * target_rate) // 2 + 1 < len(fir_whiten_reference_psd.data.data):
fir_whiten_reference_psd = lal.CutREAL8FrequencySeries(fir_whiten_reference_psd, 0, (psd_fft_length * target_rate) // 2 + 1)
# set the reference phase PSD
firkernel.set_phase(fir_whiten_reference_psd)
whiten.connect_after("notify::mean-psd", set_fir_psd, head, firkernel)
......@@ -246,7 +245,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
# consumed immediately when needed, so there is no risk
# that these queues add to the latency, so make them
# generously large.
# FIXME the -max(rates) extra padding is for the high pass
# FIXME the -target_rate extra padding is for the high pass
# filter: NOTE it also needs to be big enough for the
# downsampling filter, but that is typically smaller than the
# HP filter (192 samples at Qual 9)
......@@ -255,9 +254,9 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
if statevector is not None or dqvector is not None:
head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * (psd_fft_length + 2))
if statevector is not None:
head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -max(rates), attack_length = -max(rates) * (psd_fft_length + 1))
head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -target_rate, attack_length = -target_rate * (psd_fft_length + 1))
if dqvector is not None:
head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -max(rates), attack_length = -max(rates) * (psd_fft_length + 1))
head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -target_rate, attack_length = -target_rate * (psd_fft_length + 1))
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_fir" % instrument)
#head = pipeparts.mknxydumpsinktee(pipeline, head, filename = "after_mkfirbank.txt")
else:
......@@ -323,12 +322,12 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
head = pipeparts.mkaudioconvert(pipeline, head)
if width == 64:
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64)))
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (target_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64)))
elif width == 32:
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32)))
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (target_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32)))
else:
raise ValueError("invalid width: %d" % width)
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_whitehoft" % (instrument, max(rates)))
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_whitehoft" % (instrument, target_rate))
#
# optionally add vetoes
......@@ -346,7 +345,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
# FIXME: this could be omitted if ht_gate_threshold is None, but
# we need to collect whitened h(t) segments, however something
# could be done to collect those if these gates aren't here.
ht_gate_window = max(max(rates) // 2, 1) # samples
ht_gate_window = max(target_rate // 2, 1) # samples
head = datasource.mkhtgate(pipeline, head, threshold = ht_gate_threshold if ht_gate_threshold is not None else float("+inf"), hold_length = ht_gate_window, attack_length = ht_gate_window, name = "%s_ht_gate" % instrument)
# emit signals so that a user can latch on to them
head.set_property("emit-signals", True)
......@@ -354,6 +353,16 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
if track_latency:
head = pipeparts.mklatency(pipeline, head, name = "%s_whitening_latency" % instrument, silent = True)
return head
def mkmultiband(pipeline, head, rates, unit_normalize = True):
"""
Build pipeline stage to multiband a stream.
* rates: a list of the requested sample rates, e.g., [512,1024].
"""
#
# tee for highest sample rate stream
#
......@@ -407,3 +416,19 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
return head
#
# =============================================================================
#
# Stream Element Registry
#
# =============================================================================
#
@plugins.register
def elements():
return {
"condition": mkcondition,
"multiband": mkmultiband,
}
......@@ -228,6 +228,9 @@ def _get_registered_elements():
# load elements
manager.register(pipeparts)
from gstlal.pipeparts import condition
manager.register(condition)
# add all registered plugins to registry
registered = {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment