Forked from
lscsoft / GstLAL
2202 commits behind the upstream repository.
-
Patrick Godwin authoredPatrick Godwin authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
multirate_datasource.py 9.12 KiB
# Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel
# Copyright (C) 2017--2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import sys
import os
import optparse
import math
import numpy
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstAudio', '1.0')
from gi.repository import GObject, Gst, GstAudio
GObject.threads_init()
Gst.init(None)
import lal
from gstlal import bottle
from gstlal import pipeparts
from gstlal import reference_psd
from gstlal import datasource
# FIXME: Find a better way than using global variables.
PSD_FFT_LENGTH = 32
NATIVE_RATE_CUTOFF = 128
#
# =============================================================================
#
# Pipeline Parts
#
# =============================================================================
#
def mkwhitened_multirate_src(pipeline, src, rates, native_rate, instrument, psd = None, psd_fft_length = PSD_FFT_LENGTH, veto_segments = None, nxydump_segment = None, track_psd = True, block_duration = int(1 * Gst.SECOND), width = 64, channel_name = "hoft"):
"""!
Build pipeline stage to whiten and downsample auxiliary channels.
- pipeline: the gstreamer pipeline to add this to
- src: the gstreamer element that will be providing data to this
- rates: a list of the requested sample rates, e.g., [512,1024].
- native_rate: Native sampling rate of channel
- instrument: the instrument to process
- psd: a psd frequency series
- psd_fft_length: length of fft used for whitening
- veto_segments: segments to mark as gaps after whitening
- nxydump_segment: segment to dump to disk after whitening
- track_psd: decide whether to dynamically track the spectrum or use the fixed spectrum provided
- width: type convert to either 32 or 64 bit float
- channel_name: channel to whiten and downsample
**Gstreamer graph describing this function**
.. graphviz::
digraph mkwhitened_multirate_src {
rankdir = LR;
compound=true;
node [shape=record fontsize=10 fontname="Verdana"];
edge [fontsize=8 fontname="Verdana"];
capsfilter1;
interpolator;
highpass;
whiten;
audioconvert;
capsfilter2;
tee;
segmentsrcgate;
audioamplifyr1;
audioamplifyr2;
audioamplifyrn;
interpolatorr1;
interpolatorr2;
interpolatorrn;
capsfilterr1;
capsfilterr2;
capsfilterrn;
// nodes
"?" -> capsfilter1 -> interpolator -> whiten;
highpass -> whiten -> audioconvert -> capsfilter2 -> tee;
tee -> segmentsrcgate [label="Whitened timeseries to disk"];
tee -> audioamplifyr1 [label="Rate 1"];
audioamplifyr1 -> interpolatorr1;
interpolatorr1 -> capsfilterr1 -> "? 1";
tee -> audioamplifyr2 [label="Rate 2"];
audioamplifyr2 -> interpolatorr2;
interpolatorr2 -> capsfilterr2 -> "? 2";
tee -> audioamplifyrn [label="Rate N"];
audioamplifyrn -> interpolatorrn;
interpolatorrn -> capsfilterrn -> "? N";
}
"""
#
# input sanity checks
#
if psd is None and not track_psd:
raise ValueError("must enable track_psd when psd is None")
if int(psd_fft_length) != psd_fft_length:
raise ValueError("psd_fft_length must be an integer")
psd_fft_length = int(psd_fft_length)
#
# down-sample to highest of target sample rates.
#
quality = 10 # set by interpolator
max_rate = max(rates)
head = pipeparts.mkcapsfilter(pipeline, src, "audio/x-raw, rate=[%d,MAX]" % max_rate)
head = pipeparts.mkinterpolator(pipeline, head)
head = pipeparts.mkaudioconvert(pipeline, head)
#
# construct whitener.
#
zero_pad = psd_fft_length // 4
head = pipeparts.mktee(pipeline, head)
whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_%s_lalwhiten" % (instrument, channel_name))
pipeparts.mkfakesink(pipeline, whiten)
#
# high pass filter
#
block_stride = block_duration * max_rate // Gst.SECOND
if native_rate >= NATIVE_RATE_CUTOFF:
kernel = reference_psd.one_second_highpass_kernel(max_rate, cutoff = 12)
assert len(kernel) % 2 == 1, "high-pass filter length is not odd"
head = pipeparts.mkfirbank(pipeline, head, fir_matrix = numpy.array(kernel, ndmin = 2), block_stride = block_stride, time_domain = False, latency = (len(kernel) - 1) // 2)
#
# FIR filter for whitening kernel
#
head = pipeparts.mkfirbank(pipeline, head, fir_matrix = numpy.zeros((1, 1 + max_rate * psd_fft_length), dtype=numpy.float64), block_stride = block_stride, time_domain = False, latency = 0)
#
# compute whitening kernel from PSD
#
def set_fir_psd(whiten, pspec, firelem, psd_fir_kernel):
psd_data = numpy.array(whiten.get_property("mean-psd"))
psd = lal.CreateREAL8FrequencySeries(
name = "psd",
epoch = lal.LIGOTimeGPS(0),
f0 = 0.0,
deltaF = whiten.get_property("delta-f"),
sampleUnits = lal.Unit(whiten.get_property("psd-units")),
length = len(psd_data)
)
psd.data.data = psd_data
kernel, latency, sample_rate = psd_fir_kernel.psd_to_linear_phase_whitening_fir_kernel(psd)
kernel, theta = psd_fir_kernel.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, sample_rate)
kernel -= numpy.mean(kernel) # subtract DC offset from signal
firelem.set_property("fir-matrix", numpy.array(kernel, ndmin = 2))
whiten.connect_after("notify::mean-psd", set_fir_psd, head, reference_psd.PSDFirKernel())
#
# Drop initial data to let the PSD settle
#
head = pipeparts.mkdrop(pipeline, head, drop_samples = 16 * psd_fft_length * max_rate)
#
# enable/disable PSD tracking
#
whiten.set_property("psd-mode", 0 if track_psd else 1)
#
# install signal handler to retrieve \Delta f and f_{Nyquist}
# whenever they are known and/or change, resample the user-supplied
# PSD, and install it into the whitener.
#
if psd is not None:
def psd_units_or_resolution_changed(elem, pspec, psd):
# make sure units are set, compute scale factor
units = lal.Unit(elem.get_property("psd-units"))
if units == lal.DimensionlessUnit:
return
scale = float(psd.sampleUnits / units)
# get frequency resolution and number of bins
delta_f = elem.get_property("delta-f")
n = int(round(elem.get_property("f-nyquist") / delta_f) + 1)
# interpolate, rescale, and install PSD
psd = reference_psd.interpolate_psd(psd, delta_f)
elem.set_property("mean-psd", psd.data.data[:n] * scale)
whiten.connect_after("notify::f-nyquist", psd_units_or_resolution_changed, psd)
whiten.connect_after("notify::delta-f", psd_units_or_resolution_changed, psd)
whiten.connect_after("notify::psd-units", psd_units_or_resolution_changed, psd)
#
# convert to desired precision
#
head = pipeparts.mkaudioconvert(pipeline, head)
if width == 64:
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64)))
elif width == 32:
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32)))
else:
raise ValueError("invalid width: %d" % width)
#
# optionally add vetoes
#
if veto_segments is not None:
head = datasource.mksegmentsrcgate(pipeline, head, veto_segments, invert_output=True)
#
# if segments are specified to dump whitened timeseries, do so
#
tee = pipeparts.mktee(pipeline, head)
if nxydump_segment:
pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, tee), "whitenedtimeseries_%s.txt" % channel_name, segment = nxydump_segment)
#
# tee for highest sample rate stream
#
head = {rate: None for rate in rates}
head[max_rate] = pipeparts.mkqueue(pipeline, tee, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 8)
#
# down-sample whitened time series to remaining target sample rates
#
for rate in sorted(set(rates))[:-1]:
head[rate] = pipeparts.mkqueue(pipeline, tee, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 8)
head[rate] = pipeparts.mkaudioamplify(pipeline, head[rate], 1. / math.sqrt(pipeparts.audioresample_variance_gain(quality, max_rate, rate)))
head[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkinterpolator(pipeline, head[rate]), caps = "audio/x-raw, rate=%d" % rate)
#
# return value is a dictionary of elements indexed by sample rate
#
return head