Skip to content
Snippets Groups Projects
Commit ea88cf1d authored by Aaron Viets's avatar Aaron Viets
Browse files

calibration_parts.py: added high-pass, low-pass, band-pass, and band-stop filtering functions

parent bc4c5318
No related branches found
No related tags found
No related merge requests found
......@@ -164,6 +164,90 @@ def removeDC(pipeline, head, caps):
return mkadder(pipeline, list_srcs(pipeline, head, DC))
def lowpass(pipeline, head, rate, length = 1.0, fcut = 500):
length *= rate
if not length % 2:
length += 1 # Make sure the filter length is odd
# Compute a low-pass filter.
lowpass = numpy.sinc(2 * float(fcut) / rate * (numpy.arange(length) - (length - 1) / 2))
lowpass *= numpy.blackman(length)
lowpass /= numpy.sum(lowpass)
# Now apply the filter
return pipeparts.mkfirbank(pipeline, head, latency = int((length - 1) / 2), fir_matrix = [lowpass], time_domain = True)
def highpass(pipeline, head, rate, length = 1.0, fcut = 9.0):
length *= rate
if not length % 2:
length += 1 # Make sure the filter length is odd
# Compute a low-pass filter.
lowpass = numpy.sinc(2 * float(fcut) / rate * (numpy.arange(length) - (length - 1) / 2))
lowpass *= numpy.blackman(length)
lowpass /= numpy.sum(lowpass)
# Create a high-pass filter from the low-pass filter through spectral inversion.
highpass = -lowpass
highpass[(length - 1) / 2] += 1
# Now apply the filter
return pipeparts.mkfirbank(pipeline, head, latency = int((length - 1) / 2), fir_matrix = [highpass], time_domain = True)
def bandpass(pipeline, head, rate, length = 1.0, f_low = 100, f_high = 400):
length *= int(rate / 2)
if not length % 2:
length += 1 # Make sure the filter length is odd
# Compute a temporary low-pass filter.
lowpass = numpy.sinc(2 * float(f_low) / rate * (numpy.arange(length) - (length - 1) / 2))
lowpass *= numpy.blackman(length)
lowpass /= numpy.sum(lowpass)
# Create the high-pass filter from the low-pass filter through spectral inversion.
highpass = -lowpass
highpass[(length - 1) / 2] += 1
# Compute the low-pass filter.
lowpass = numpy.sinc(2 * float(f_high) / rate * (numpy.arange(length) - (length - 1) / 2))
lowpass *= numpy.blackman(length)
lowpass /= numpy.sum(lowpass)
# Convolve the high-pass and low-pass filters to make a band-pass filter
bandpass = numpy.convolve(highpass, lowpass)
# Now apply the filter
return pipeparts.mkfirbank(pipeline, head, latency = int(length - 1), fir_matrix = [bandpass], time_domain = True)
def bandstop(pipeline, head, rate, length = 1.0, f_low = 100, f_high = 400):
length *= int(rate / 2)
if not length % 2:
length += 1 # Make sure the filter length is odd
# Compute a temporary low-pass filter.
lowpass = numpy.sinc(2 * float(f_low) / rate * (numpy.arange(length) - (length - 1) / 2))
lowpass *= numpy.blackman(length)
lowpass /= numpy.sum(lowpass)
# Create a high-pass filter from the low-pass filter through spectral inversion.
highpass = -lowpass
highpass[(length - 1) / 2] += 1
# Compute a low-pass filter.
lowpass = numpy.sinc(2 * float(f_high) / rate * (numpy.arange(length) - (length - 1) / 2))
lowpass *= numpy.blackman(length)
lowpass /= numpy.sum(lowpass)
# Convolve the high-pass and low-pass filters to make a temporary band-pass filter
bandpass = numpy.convolve(highpass, lowpass)
# Create a band-stop filter from the band-pass filter through spectral inversion.
bandstop = -bandpass
bandstop[length - 1] += 1
# Now apply the filter
return pipeparts.mkfirbank(pipeline, head, latency = int(length - 1), fir_matrix = [bandstop], time_domain = True)
#
# Calibration factor related functions
#
......
#!/usr/bin/env python
# Copyright (C) 2017 Aaron Viets
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import numpy
import sys
from gstlal import pipeparts
from gstlal import calibration_parts
import test_common
from gi.repository import Gst
#
# =============================================================================
#
# Utilities
#
# =============================================================================
#
def fir_matrix_update(elem, arg, filtered):
filtered.set_property("kernel", elem.get_property("fir_matrix")[0][::-1])
print("fir matrix updated")
#
# =============================================================================
#
# Pipelines
#
# =============================================================================
#
def lal_fcc_update_01(pipeline, name):
#
# This test passes an impulse through the fcc_updater
#
data_rate = 16384 # Hz
fcc_rate = 16 # Hz
fcc_default = 360 # Hz
fcc_update = 345 # Hz
fcc_averaging_time = 5 # seconds
fcc_filter_duration = 1 # seconds
fcc_filter_taper_length = 32768 # seconds
impulse_separation = 1.0 # seconds
buffer_length = 1.0 # seconds
test_duration = 10.0 # seconds
#
# build pipeline
#
src = pipeparts.mktee(pipeline, test_common.test_src(pipeline, buffer_length = buffer_length, wave = 0, freq = 1.0 / impulse_separation, rate = data_rate, test_duration = test_duration, width = 64))
impulses = calibration_parts.mkinsertgap(pipeline, src, bad_data_intervals = [0.999999999, 1.00000001], block_duration = buffer_length * Gst.SECOND)
impulses = pipeparts.mktee(pipeline, impulses)
pipeparts.mknxydumpsink(pipeline, impulses, "%s_impulses.txt" % name)
fcc = calibration_parts.mkresample(pipeline, src, 0, False, "audio/x-raw,format=F64LE,rate=%d" % fcc_rate)
fcc = pipeparts.mkpow(pipeline, fcc, exponent = 0.0)
fcc = pipeparts.mkgeneric(pipeline, fcc, "lal_add_constant", value = fcc_update - 1)
fcc = pipeparts.mktee(pipeline, fcc)
pipeparts.mknxydumpsink(pipeline, fcc, "%s_fcc.txt" % name)
update_fcc = pipeparts.mkgeneric(pipeline, fcc, "lal_fcc_update", data_rate = data_rate, fcc_rate = fcc_rate, fcc_model = fcc_default, averaging_time = fcc_averaging_time, filter_duration = fcc_filter_duration)
pipeparts.mkfakesink(pipeline, update_fcc)
default_fir_matrix = numpy.zeros(int(numpy.floor(data_rate * fcc_filter_duration / 2.0 + 1) * 2.0 - 2.0))
latency = int(data_rate * fcc_filter_duration / 2.0 + 1)
default_fir_matrix[latency] = 1.0
res = pipeparts.mkgeneric(pipeline, impulses, "lal_tdwhiten", kernel = default_fir_matrix[::-1], latency = latency, taper_length = fcc_filter_taper_length)
update_fcc.connect("notify::fir-matrix", fir_matrix_update, res)
pipeparts.mknxydumpsink(pipeline, res, "%s_out.txt" % name)
#
# done
#
return pipeline
def lal_fcc_update_02(pipeline, name):
#
# This test passes a sinusoid through the fcc_updater
#
data_rate = 16384 # Hz
data_frequency = 1024 # Hz
fcc_rate = 16 # Hz
fcc_default = 360 # Hz
fcc_update = 345 # Hz
fcc_averaging_time = 5 # seconds
fcc_filter_duration = 1 # seconds
fcc_filter_taper_length = 32768 # seconds
buffer_length = 1.0 # seconds
test_duration = 10.0 # seconds
#
# build pipeline
#
src = pipeparts.mktee(pipeline, test_common.test_src(pipeline, buffer_length = buffer_length, wave = 0, freq = data_frequency, rate = data_rate, test_duration = test_duration, width = 64))
sinusoid = pipeparts.mktee(pipeline, src)
pipeparts.mknxydumpsink(pipeline, sinusoid, "%s_sinusoid.txt" % name)
fcc = calibration_parts.mkresample(pipeline, src, 0, False, "audio/x-raw,format=F64LE,rate=%d" % fcc_rate)
fcc = pipeparts.mkpow(pipeline, fcc, exponent = 0.0)
fcc = pipeparts.mkgeneric(pipeline, fcc, "lal_add_constant", value = fcc_update - 1)
fcc = pipeparts.mktee(pipeline, fcc)
pipeparts.mknxydumpsink(pipeline, fcc, "%s_fcc.txt" % name)
update_fcc = pipeparts.mkgeneric(pipeline, fcc, "lal_fcc_update", data_rate = data_rate, fcc_rate = fcc_rate, fcc_model = fcc_default, averaging_time = fcc_averaging_time, filter_duration = fcc_filter_duration)
pipeparts.mkfakesink(pipeline, update_fcc)
default_fir_matrix = numpy.zeros(int(numpy.floor(data_rate * fcc_filter_duration / 2.0 + 1) * 2.0 - 2.0))
latency = int(data_rate * fcc_filter_duration / 2.0 + 1)
default_fir_matrix[latency] = 1.0
res = pipeparts.mkgeneric(pipeline, sinusoid, "lal_tdwhiten", kernel = default_fir_matrix[::-1], latency = latency, taper_length = fcc_filter_taper_length)
update_fcc.connect("notify::fir-matrix", fir_matrix_update, res)
pipeparts.mknxydumpsink(pipeline, res, "%s_out.txt" % name)
#
# done
#
return pipeline
#
# =============================================================================
#
# Main
#
# =============================================================================
#
test_common.build_and_run(lal_fcc_update_01, "lal_fcc_update_01")
test_common.build_and_run(lal_fcc_update_02, "lal_fcc_update_02")
#!/usr/bin/env python
# Copyright (C) 2017 Aaron Viets
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import numpy
import sys
from gstlal import pipeparts
from gstlal import calibration_parts
import test_common
from gi.repository import Gst
#
# =============================================================================
#
# Pipelines
#
# =============================================================================
#
def lal_transferfunction_01(pipeline, name):
#
# This test adds various noise into a stream and uses lal_transferfunction to remove it
#
rate = 16384 # Hz
buffer_length = 1.0 # seconds
test_duration = 3000.0 # seconds
width = 64 # bits
#
# build pipeline
#
hoft = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.001, rate = rate, test_duration = test_duration, width = width, verbose = False)
hoft = pipeparts.mktee(pipeline, hoft)
common_noise = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, freq = 512, volume = 1, rate = rate, test_duration = test_duration, width = width, verbose = False)
common_noise = pipeparts.mktee(pipeline, common_noise)
noise1 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, freq = 512, volume = 1, rate = rate, test_duration = test_duration, width = width, verbose = False)
noise1 = calibration_parts.mkadder(pipeline, calibration_parts.list_srcs(pipeline, common_noise, noise1))
noise1 = pipeparts.mktee(pipeline, noise1)
noise1_for_cleaning = pipeparts.mkgeneric(pipeline, noise1, "identity")
hoft_noise1 = pipeparts.mkshift(pipeline, noise1, shift = 00000000)
hoft_noise1 = pipeparts.mkaudioamplify(pipeline, hoft_noise1, 2)
hoft_noise1 = pipeparts.mktee(pipeline, hoft_noise1)
noise2 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, freq = 1024, volume = 1, rate = rate, test_duration = test_duration, width = width, verbose = False)
noise2 = calibration_parts.mkadder(pipeline, calibration_parts.list_srcs(pipeline, common_noise, noise2))
noise2 = pipeparts.mktee(pipeline, noise2)
noise2_for_cleaning = pipeparts.mkgeneric(pipeline, noise2, "identity")
hoft_noise2 = pipeparts.mkshift(pipeline, noise2, shift = 00000)
hoft_noise2 = pipeparts.mkaudioamplify(pipeline, hoft_noise2, 3)
hoft_noise2 = pipeparts.mktee(pipeline, hoft_noise2)
noisy_hoft = calibration_parts.mkadder(pipeline, calibration_parts.list_srcs(pipeline, hoft, hoft_noise1, hoft_noise2))
noisy_hoft = pipeparts.mktee(pipeline, noisy_hoft)
clean_hoft = calibration_parts.clean_data(pipeline, calibration_parts.list_srcs(pipeline, noisy_hoft, noise1_for_cleaning, noise2_for_cleaning), rate / 4, rate / 8, 16384, test_duration * rate)
clean_hoft = pipeparts.mktee(pipeline, clean_hoft)
# hoft_inv = pipeparts.mkpow(pipeline, hoft, exponent = -1.0)
# clean_hoft_over_hoft = calibration_parts.mkmultiplier(pipeline, calibration_parts.list_srcs(pipeline, hoft_inv, clean_hoft))
# pipeparts.mknxydumpsink(pipeline, clean_hoft_over_hoft, "%s_clean_hoft_over_hoft.txt" % name)
pipeparts.mknxydumpsink(pipeline, hoft, "%s_hoft.txt" % name)
pipeparts.mknxydumpsink(pipeline, hoft_noise1, "%s_hoft_noise1.txt" % name)
pipeparts.mknxydumpsink(pipeline, hoft_noise2, "%s_hoft_noise2.txt" % name)
pipeparts.mknxydumpsink(pipeline, noisy_hoft, "%s_noisy_hoft.txt" % name)
pipeparts.mknxydumpsink(pipeline, clean_hoft, "%s_clean_hoft.txt" % name)
#
# done
#
return pipeline
def lal_transferfunction_02(pipeline, name):
#
# This test produces simple multi-channel data to be read into lal_transferfunction
#
rate = 16384 # Hz
buffer_length = 1.0 # seconds
test_duration = 100.0 # seconds
width = 64 # bits
channels = 1
#
# build pipeline
#
hoft = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.1, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
hoft2 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.1, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
hoft3 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.1, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
hoft4 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.1, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
# hoft5 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.1, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
# hoft6 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.1, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
hoft = calibration_parts.mkinterleave(pipeline, calibration_parts.list_srcs(pipeline, hoft, hoft2, hoft3, hoft4))
pipeparts.mkgeneric(pipeline, hoft, "lal_transferfunction", fft_length = rate / 4, fft_overlap = rate / 8, num_ffts = 256, update_samples = test_duration * rate, filename = "transferfunction.txt", make_fir_filters = True)
return pipeline
def lal_transferfunction_03(pipeline, name):
#
# This test produces two-channel data to be read into lal_transferfunction
#
rate = 16384 # Hz
buffer_length = 1.0 # seconds
test_duration = 100.0 # seconds
width = 64 # bits
channels = 1
#
# build pipeline
#
hoft = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 1, freq = 512, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
# hoftadd = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 0, volume = 1, freq = 2048, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
# hoft = calibration_parts.mkadder(pipeline, calibration_parts.list_srcs(pipeline, hoft, hoftadd))
hoft = pipeparts.mktee(pipeline, hoft)
difference = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.001, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
difference = pipeparts.mktee(pipeline, difference)
difference2 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.001, freq = 4096, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
# difference2 = pipeparts.mktee(pipeline, difference2)
hoft2 = calibration_parts.mkadder(pipeline, calibration_parts.list_srcs(pipeline, hoft, difference))
# hoft2 = pipeparts.mkshift(pipeline, hoft2, shift = 305176)
# hoft2 = pipeparts.mkaudioamplify(pipeline, hoft2, -1)
hoft2 = pipeparts.mktee(pipeline, hoft2)
hoft3 = calibration_parts.mkadder(pipeline, calibration_parts.list_srcs(pipeline, hoft, difference2))
hoft3 = pipeparts.mktee(pipeline, hoft3)
# hoft3 = test_common.test_src(pipeline, buffer_length = buffer_length, wave = 5, volume = 0.1, channels = channels, rate = rate, test_duration = test_duration, width = width, verbose = False)
clean_data = calibration_parts.clean_data(pipeline, calibration_parts.list_srcs(pipeline, hoft, hoft2, hoft3), rate / 8, rate / 16, 32, rate * 100)
pipeparts.mknxydumpsink(pipeline, hoft, "%s_hoft.txt" % name)
pipeparts.mknxydumpsink(pipeline, hoft2, "%s_hoft2.txt" % name)
pipeparts.mknxydumpsink(pipeline, difference, "%s_difference.txt" % name)
# pipeparts.mknxydumpsink(pipeline, difference2, "%s_difference2.txt" % name)
pipeparts.mknxydumpsink(pipeline, clean_data, "%s_out.txt" % name)
return pipeline
#
# =============================================================================
#
# Main
#
# =============================================================================
#
#test_common.build_and_run(lal_transferfunction_01, "lal_transferfunction_01")
test_common.build_and_run(lal_transferfunction_02, "lal_transferfunction_02")
#test_common.build_and_run(lal_transferfunction_03, "lal_transferfunction_03")
#!/usr/bin/env python
# Copyright (C) 2016 Aaron Viets
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import numpy
import sys
from gstlal import pipeparts
from gstlal import calibration_parts
import test_common
#
# =============================================================================
#
# Pipelines
#
# =============================================================================
#
def pyfilter_test_01(pipeline, name):
#
# This test removes the DC component from a stream of ones (i.e., the result should be zero)
#
rate = 16384 # Hz
buffer_length = 1.0 # seconds
test_duration = 10.0 # seconds
DC = 1.0
wave = 0
freq = 90
volume = 1.0
#
# build pipeline
#
src = test_common.test_src(pipeline, buffer_length = buffer_length, rate = rate, freq = freq, test_duration = test_duration, wave = wave, width = 64)
head = pipeparts.mkaudioamplify(pipeline, src, volume)
head = pipeparts.mkgeneric(pipeline, head, "lal_add_constant", value = DC)
head = pipeparts.mktee(pipeline, head)
pipeparts.mknxydumpsink(pipeline, head, "%s_in.txt" % name)
head = calibration_parts.bandstop(pipeline, head, rate)
pipeparts.mknxydumpsink(pipeline, head, "%s_out.txt" % name)
#
# done
#
return pipeline
#
# =============================================================================
#
# Main
#
# =============================================================================
#
test_common.build_and_run(pyfilter_test_01, "pyfilter_test_01")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment