diff --git a/gstlal/python/Makefile.am b/gstlal/python/Makefile.am index a2864f3ee8bf2f38e10811cff2b14b25a8e4ff80..a76e1a4009e80836af2ac7694f81c1ba52dd2f15 100644 --- a/gstlal/python/Makefile.am +++ b/gstlal/python/Makefile.am @@ -22,7 +22,6 @@ pkgpython_PYTHON = \ kernels.py \ matplotlibhelper.py \ misc.py \ - multirate_datasource.py \ pipeio.py \ pipeline.py \ pipeutil.py \ diff --git a/gstlal/python/pipeparts/Makefile.am b/gstlal/python/pipeparts/Makefile.am index 4c4495ff5119ba1b3bb6f2f352cc7aed4d46d0de..91c1b8be1cdd487ad390dfc641b2c21f34ba9d8e 100644 --- a/gstlal/python/pipeparts/Makefile.am +++ b/gstlal/python/pipeparts/Makefile.am @@ -3,6 +3,7 @@ pipepartsdir = $(pkgpythondir)/pipeparts pipeparts_PYTHON = \ __init__.py \ + condition.py \ encode.py \ filters.py \ mux.py \ diff --git a/gstlal/python/multirate_datasource.py b/gstlal/python/pipeparts/condition.py similarity index 88% rename from gstlal/python/multirate_datasource.py rename to gstlal/python/pipeparts/condition.py index 938bf0a5744b1c53a1ec01717080cfad03e98455..f8617923cb7444d0482698613cd9526936f7beba 100644 --- a/gstlal/python/multirate_datasource.py +++ b/gstlal/python/pipeparts/condition.py @@ -32,9 +32,7 @@ import numpy import gi gi.require_version('Gst', '1.0') -from gi.repository import GObject, Gst, GstAudio -GObject.threads_init() -Gst.init(None) +from gi.repository import GObject, Gst import lal from ligo import segments @@ -42,6 +40,7 @@ from ligo import segments from gstlal import bottle from gstlal import kernels from gstlal import pipeparts +from gstlal import plugins from gstlal import datasource from gstlal.psd import interpolate_psd @@ -67,13 +66,13 @@ except KeyError as e: raise -def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_fft_length = 32, ht_gate_threshold = float("inf"), veto_segments = None, nxydump_segment = None, track_psd = False, block_duration = 1 * Gst.SECOND, zero_pad = None, width = 64, unit_normalize = True, statevector = None, dqvector = None, fir_whiten_reference_psd = None, track_latency = False): +def mkcondition(pipeline, src, target_rate, instrument, psd = None, psd_fft_length = 32, ht_gate_threshold = float("inf"), veto_segments = None, nxydump_segment = None, track_psd = False, block_duration = 1 * Gst.SECOND, zero_pad = None, width = 64, statevector = None, dqvector = None, fir_whiten_reference_psd = None): """ Build pipeline stage to whiten and downsample h(t). * pipeline: the gstreamer pipeline to add this to * src: the gstreamer element that will be providing data to this - * rates: a list of the requested sample rates, e.g., [512,1024]. + * target_rate: the requested sample rate. * instrument: the instrument to process * psd: a psd frequency series * psd_fft_length: length of fft used for whitening @@ -184,10 +183,10 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f # sample rate of your data source. # - head = pipeparts.mkcapsfilter(pipeline, src, "audio/x-raw, rate=[%d,MAX]" % max(rates)) + head = pipeparts.mkcapsfilter(pipeline, src, f"audio/x-raw, rate=[{target_rate:d},MAX]") head = pipeparts.mkinterpolator(pipeline, head) head = pipeparts.mkaudioconvert(pipeline, head) - head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_hoft" % (instrument, max(rates))) + head = pipeparts.mkchecktimestamps(pipeline, head, f"{instrument}_timestamps_{target_rate:d}_hoft") # # construct whitener. @@ -200,12 +199,12 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f # high pass filter kernel = kernels.one_second_highpass_kernel(max(rates), cutoff = 12) - block_stride = block_duration * max(rates) // Gst.SECOND + block_stride = block_duration * target_rate // Gst.SECOND assert len(kernel) % 2 == 1, "high-pass filter length is not odd" head = pipeparts.mkfirbank(pipeline, head, fir_matrix = numpy.array(kernel, ndmin = 2), block_stride = block_stride, time_domain = False, latency = (len(kernel) - 1) // 2) # FIR filter for whitening kernel - head = pipeparts.mktdwhiten(pipeline, head, kernel = numpy.zeros(1 + max(rates) * psd_fft_length, dtype=numpy.float64), latency = 0) + head = pipeparts.mktdwhiten(pipeline, head, kernel = numpy.zeros(1 + target_rate * psd_fft_length, dtype=numpy.float64), latency = 0) # compute whitening kernel from PSD def set_fir_psd(whiten, pspec, firelem, psd_fir_kernel): @@ -233,9 +232,9 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f # confirm that the reference phase PSD's Nyquist is # sufficiently high, then reduce it to the required # Nyquist if needed. - assert (psd_fft_length * max(rates)) // 2 + 1 <= len(fir_whiten_reference_psd.data.data), "fir_whiten_reference_psd Nyquist too low" - if (psd_fft_length * max(rates)) // 2 + 1 < len(fir_whiten_reference_psd.data.data): - fir_whiten_reference_psd = lal.CutREAL8FrequencySeries(fir_whiten_reference_psd, 0, (psd_fft_length * max(rates)) // 2 + 1) + assert (psd_fft_length * target_rate) // 2 + 1 <= len(fir_whiten_reference_psd.data.data), "fir_whiten_reference_psd Nyquist too low" + if (psd_fft_length * target_rate) // 2 + 1 < len(fir_whiten_reference_psd.data.data): + fir_whiten_reference_psd = lal.CutREAL8FrequencySeries(fir_whiten_reference_psd, 0, (psd_fft_length * target_rate) // 2 + 1) # set the reference phase PSD firkernel.set_phase(fir_whiten_reference_psd) whiten.connect_after("notify::mean-psd", set_fir_psd, head, firkernel) @@ -246,7 +245,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f # consumed immediately when needed, so there is no risk # that these queues add to the latency, so make them # generously large. - # FIXME the -max(rates) extra padding is for the high pass + # FIXME the -target_rate extra padding is for the high pass # filter: NOTE it also needs to be big enough for the # downsampling filter, but that is typically smaller than the # HP filter (192 samples at Qual 9) @@ -255,9 +254,9 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f if statevector is not None or dqvector is not None: head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * (psd_fft_length + 2)) if statevector is not None: - head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -max(rates), attack_length = -max(rates) * (psd_fft_length + 1)) + head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -target_rate, attack_length = -target_rate * (psd_fft_length + 1)) if dqvector is not None: - head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -max(rates), attack_length = -max(rates) * (psd_fft_length + 1)) + head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -target_rate, attack_length = -target_rate * (psd_fft_length + 1)) head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_fir" % instrument) #head = pipeparts.mknxydumpsinktee(pipeline, head, filename = "after_mkfirbank.txt") else: @@ -323,12 +322,12 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f head = pipeparts.mkaudioconvert(pipeline, head) if width == 64: - head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64))) + head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (target_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64))) elif width == 32: - head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32))) + head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (target_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32))) else: raise ValueError("invalid width: %d" % width) - head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_whitehoft" % (instrument, max(rates))) + head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_whitehoft" % (instrument, target_rate)) # # optionally add vetoes @@ -346,7 +345,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f # FIXME: this could be omitted if ht_gate_threshold is None, but # we need to collect whitened h(t) segments, however something # could be done to collect those if these gates aren't here. - ht_gate_window = max(max(rates) // 2, 1) # samples + ht_gate_window = max(target_rate // 2, 1) # samples head = datasource.mkhtgate(pipeline, head, threshold = ht_gate_threshold if ht_gate_threshold is not None else float("+inf"), hold_length = ht_gate_window, attack_length = ht_gate_window, name = "%s_ht_gate" % instrument) # emit signals so that a user can latch on to them head.set_property("emit-signals", True) @@ -354,6 +353,16 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f if track_latency: head = pipeparts.mklatency(pipeline, head, name = "%s_whitening_latency" % instrument, silent = True) + return head + + +def mkmultiband(pipeline, head, rates, unit_normalize = True): + """ + Build pipeline stage to multiband a stream. + + * rates: a list of the requested sample rates, e.g., [512,1024]. + + """ # # tee for highest sample rate stream # @@ -407,3 +416,19 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f return head + +# +# ============================================================================= +# +# Stream Element Registry +# +# ============================================================================= +# + + +@plugins.register +def elements(): + return { + "condition": mkcondition, + "multiband": mkmultiband, + } diff --git a/gstlal/python/stream.py b/gstlal/python/stream.py index 881274cc4e6308db44855ee39fd7cb83cdf3cfb0..c44071d05d64580ad8029175c9c3654d53fa21e0 100644 --- a/gstlal/python/stream.py +++ b/gstlal/python/stream.py @@ -228,6 +228,9 @@ def _get_registered_elements(): # load elements manager.register(pipeparts) + + from gstlal.pipeparts import condition + manager.register(condition) # add all registered plugins to registry registered = {}