From 3a69f39bcf7516a8f1c548671f2db4ecc91a2556 Mon Sep 17 00:00:00 2001 From: Aaron Viets <aaron.viets@ligo.org> Date: Wed, 18 Jul 2018 19:37:13 -0700 Subject: [PATCH] lal_insertgap: Now handles multi-channel streams. --- gstlal-calibration/gst/lal/gstlal_insertgap.c | 93 ++++++++----------- gstlal-calibration/gst/lal/gstlal_insertgap.h | 1 + .../python/calibration_parts.py | 8 +- 3 files changed, 47 insertions(+), 55 deletions(-) diff --git a/gstlal-calibration/gst/lal/gstlal_insertgap.c b/gstlal-calibration/gst/lal/gstlal_insertgap.c index 068d785097..5f8efc2616 100644 --- a/gstlal-calibration/gst/lal/gstlal_insertgap.c +++ b/gstlal-calibration/gst/lal/gstlal_insertgap.c @@ -100,7 +100,7 @@ static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE( GST_STATIC_CAPS( "audio/x-raw, " \ "rate = (int) [1, MAX], " \ - "channels = (int) 1, " \ + "channels = (int) [1, MAX], " \ "format = (string) { "GST_AUDIO_NE(U32)", " GST_AUDIO_NE(F32) ", " GST_AUDIO_NE(F64) ", " GST_AUDIO_NE(Z64) ", " GST_AUDIO_NE(Z128) " }, " \ "layout = (string) interleaved, " \ "channel-mask = (bitmask) 0" @@ -115,7 +115,7 @@ static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE( GST_STATIC_CAPS( "audio/x-raw, " \ "rate = (int) [1, MAX], " \ - "channels = (int) 1, " \ + "channels = (int) [1, MAX], " \ "format = (string) { "GST_AUDIO_NE(U32)", " GST_AUDIO_NE(F32) ", " GST_AUDIO_NE(F64) ", " GST_AUDIO_NE(Z64) ", " GST_AUDIO_NE(Z128) " }, " \ "layout = (string) interleaved, " \ "channel-mask = (bitmask) 0" @@ -144,42 +144,29 @@ G_DEFINE_TYPE_WITH_CODE( */ -static gboolean check_data(double complex data, double *bad_data_intervals, gint array_length, gboolean remove_nan, gboolean remove_inf, gboolean complex_data) { - double data_re = creal(data); - gint i; - if(complex_data){ - double data_im = cimag(data); - if(bad_data_intervals) { - if(data_re <= bad_data_intervals[0] || data_re >= bad_data_intervals[array_length - 1] || data_im <= bad_data_intervals[0] || data_im >= bad_data_intervals[array_length - 1]) - return TRUE; - for(i = 1; i < array_length - 1; i += 2) { - if((data_re >= bad_data_intervals[i] && data_re <= bad_data_intervals[i + 1]) || (data_im >= bad_data_intervals[i] && data_im <= bad_data_intervals[i + 1])) - return TRUE; - } - } - if(remove_nan && (isnan(data_re) || isnan(data_im))) - return TRUE; - if(remove_inf && (isinf(data_re) || isinf(data_im))) - return TRUE; - } else { - if(bad_data_intervals) { - if(data_re <= bad_data_intervals[0] || data_re >= bad_data_intervals[array_length - 1]) - return TRUE; - for(i = 1; i < array_length - 1; i += 2) { - if(data_re >= bad_data_intervals[i] && data_re <= bad_data_intervals[i + 1]) - return TRUE; - } - } - if(remove_nan && (isnan(data_re))) - return TRUE; - if(remove_inf && (isinf(data_re))) - return TRUE; - } - - return FALSE; +#define DEFINE_CHECK_DATA(DTYPE) \ +static gboolean check_data_ ## DTYPE(DTYPE *data, double *bad_data_intervals, int array_length, int num_checks, gboolean remove_nan, gboolean remove_inf) { \ + int i, j; \ + gboolean result = TRUE; \ + for(i = 0; i < num_checks; i++) { \ + if(bad_data_intervals) { \ + for(j = 0; j < array_length; j += 2) \ + result &= data[i] > bad_data_intervals[j] && data[i] < bad_data_intervals[j + 1]; \ + } \ + if(remove_nan) \ + result &= !isnan(data[i]); \ + if(remove_inf) \ + result &= !isinf(data[i]); \ + } \ + return result; \ } +DEFINE_CHECK_DATA(float); +DEFINE_CHECK_DATA(double); +DEFINE_CHECK_DATA(guint32); + + #define DEFINE_PROCESS_INBUF(DTYPE,COMPLEX) \ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *indata, DTYPE COMPLEX *outdata, GSTLALInsertGap *element, gboolean sinkbuf_gap, gboolean sinkbuf_discont, guint64 sinkbuf_offset, guint64 sinkbuf_offset_end, GstClockTime sinkbuf_dur, GstClockTime sinkbuf_pts, gboolean complex_data) \ { \ @@ -221,13 +208,13 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind for(buffer_num = 0; buffer_num < standard_blocks; buffer_num++) { \ GstBuffer *discont_buf; \ DTYPE COMPLEX *discont_buf_data; \ - discont_buf_data = g_malloc(max_block_length * sizeof(DTYPE COMPLEX)); \ + discont_buf_data = g_malloc(max_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ guint sample_num; \ - for(sample_num = 0; sample_num < max_block_length; sample_num++) { \ + for(sample_num = 0; sample_num < max_block_length * element->channels; sample_num++) { \ *discont_buf_data = sample_value; \ discont_buf_data++; \ } \ - discont_buf = gst_buffer_new_wrapped((discont_buf_data - max_block_length), max_block_length * sizeof(DTYPE COMPLEX)); \ + discont_buf = gst_buffer_new_wrapped((discont_buf_data - max_block_length * element->channels), max_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ if(G_UNLIKELY(!discont_buf)) { \ GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \ result = GST_FLOW_ERROR; \ @@ -259,13 +246,13 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind if(last_block_length != 0) { \ GstBuffer *last_discont_buf; \ DTYPE COMPLEX *last_discont_buf_data; \ - last_discont_buf_data = g_malloc(last_block_length * sizeof(DTYPE COMPLEX)); \ + last_discont_buf_data = g_malloc(last_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ guint sample_num; \ - for(sample_num = 0; sample_num < last_block_length; sample_num++) { \ + for(sample_num = 0; sample_num < last_block_length * element->channels; sample_num++) { \ *last_discont_buf_data = sample_value; \ last_discont_buf_data++; \ } \ - last_discont_buf = gst_buffer_new_wrapped((last_discont_buf_data - last_block_length), last_block_length * sizeof(DTYPE COMPLEX)); \ + last_discont_buf = gst_buffer_new_wrapped((last_discont_buf_data - last_block_length * element->channels), last_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ if(G_UNLIKELY(!last_discont_buf)) { \ GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \ result = GST_FLOW_ERROR; \ @@ -314,18 +301,18 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind g_assert_cmpuint(max_block_length, >, 0); \ \ /* Check first sample */ \ - data_is_bad = check_data((double complex) *indata, element->bad_data_intervals, element->array_length, element->remove_nan, element->remove_inf, complex_data); \ + data_is_bad = !check_data_ ## DTYPE((DTYPE *) indata, element->bad_data_intervals, element->array_length, (1 + (int) complex_data) * element->channels, element->remove_nan, element->remove_inf); \ srcbuf_gap = (sinkbuf_gap && (!(element->remove_gap))) || ((element->insert_gap) && data_is_bad); \ for(offset = 0; offset < length; offset++) { \ - data_is_bad = check_data((double complex) *indata, element->bad_data_intervals, element->array_length, element->remove_nan, element->remove_inf, complex_data); \ + data_is_bad = !check_data_ ## DTYPE((DTYPE *) indata, element->bad_data_intervals, element->array_length, (1 + (int) complex_data) * element->channels, element->remove_nan, element->remove_inf); \ srcbuf_gap_next = (sinkbuf_gap && (!(element->remove_gap))) || ((element->insert_gap) && data_is_bad); \ if(complex_data) \ *outdata = (((element->replace_value) < G_MAXDOUBLE) && data_is_bad) ? ((DTYPE) (element->replace_value)) * (1 + I) : *indata; \ else \ *outdata = (((element->replace_value) < G_MAXDOUBLE) && data_is_bad) ? (DTYPE) (element->replace_value) : *indata; \ current_srcbuf_length++; \ - indata++; \ - outdata++; \ + indata += element->channels; \ + outdata += element->channels; \ \ /* * We need to push an output buffer if: @@ -343,14 +330,14 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind */ \ offset--; \ current_srcbuf_length--; \ - indata--; \ - outdata--; \ + indata -= element->channels; \ + outdata -= element->channels; \ } \ GstBuffer *srcbuf; \ DTYPE COMPLEX *srcbuf_data; \ - srcbuf_data = g_malloc(current_srcbuf_length * sizeof(DTYPE COMPLEX)); \ - memcpy(srcbuf_data, (outdata - current_srcbuf_length), current_srcbuf_length * sizeof(DTYPE COMPLEX)); \ - srcbuf = gst_buffer_new_wrapped(srcbuf_data, current_srcbuf_length * sizeof(DTYPE COMPLEX)); \ + srcbuf_data = g_malloc(current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \ + memcpy(srcbuf_data, (outdata - current_srcbuf_length * element->channels), current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \ + srcbuf = gst_buffer_new_wrapped(srcbuf_data, current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \ \ if(G_UNLIKELY(!srcbuf)) { \ GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \ @@ -379,8 +366,8 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind if(srcbuf_gap_next != srcbuf_gap) { \ /* We need to reset our place in the input buffer */ \ offset++; \ - indata++; \ - outdata++; \ + indata += element->channels; \ + outdata += element->channels; \ current_srcbuf_length = 1; \ srcbuf_gap = srcbuf_gap_next; \ } else \ @@ -441,6 +428,7 @@ static gboolean sink_event(GstPad *pad, GstObject *parent, GstEvent *event) if(success) { /* record stream parameters */ element->rate = GST_AUDIO_INFO_RATE(&info); + element->channels = GST_AUDIO_INFO_CHANNELS(&info); element->unit_size = GST_AUDIO_INFO_BPF(&info); if(!strcmp(name, GST_AUDIO_NE(U32))) { element->data_type = GSTLAL_INSERTGAP_U32; @@ -935,6 +923,7 @@ static void gstlal_insertgap_init(GSTLALInsertGap *element) element->bad_data_intervals = NULL; element->array_length = 0; element->rate = 0; + element->channels = 0; element->unit_size = 0; element->last_sinkbuf_ets = 0; element->last_sinkbuf_offset_end = 0; diff --git a/gstlal-calibration/gst/lal/gstlal_insertgap.h b/gstlal-calibration/gst/lal/gstlal_insertgap.h index b100193090..df213921cf 100644 --- a/gstlal-calibration/gst/lal/gstlal_insertgap.h +++ b/gstlal-calibration/gst/lal/gstlal_insertgap.h @@ -67,6 +67,7 @@ struct _GSTLALInsertGap { /* stream parameters */ gint rate; + gint channels; gint unit_size; enum gstlal_insertgap_data_type { GSTLAL_INSERTGAP_U32 = 0, diff --git a/gstlal-calibration/python/calibration_parts.py b/gstlal-calibration/python/calibration_parts.py index c979058dff..a858382c1e 100644 --- a/gstlal-calibration/python/calibration_parts.py +++ b/gstlal-calibration/python/calibration_parts.py @@ -34,7 +34,7 @@ def mkqueue(pipeline, head, length = 0, min_length = 0): if length < 0: return head else: - return pipeparts.mkqueue(pipeline, head, max_size_time = int(1000000000 * length), max_size_buffers = 0, max_size_bytes = 0, min_threshold_time = min_length) + return pipeparts.mkqueue(pipeline, head, max_size_time = int(1000000000 * length), max_size_buffers = 0, max_size_bytes = 0, min_threshold_time = int(1000000000 * min_length)) def mkcomplexqueue(pipeline, head, length = 0, min_length = 0): head = pipeparts.mktogglecomplex(pipeline, head) @@ -758,7 +758,7 @@ def update_filters(filter_maker, arg, filter_taker, maker_prop_name, taker_prop_ firfilter = filter_maker.get_property(maker_prop_name)[filter_number][::-1] filter_taker.set_property(taker_prop_name, firfilter) -def clean_data(pipeline, signal, signal_rate, witnesses, witness_rate, fft_length, fft_overlap, num_ffts, update_samples, fir_length, frequency_resolution, obsready = None, attack_length = 0, filename = None): +def clean_data(pipeline, signal, signal_rate, witnesses, witness_rate, fft_length, fft_overlap, num_ffts, update_samples, fir_length, frequency_resolution, obsready = None, attack_length = 0, chop_time = 0.0, wait_time = 0.0, filename = None): # # Use witness channels that monitor the environment to remove environmental noise @@ -780,10 +780,12 @@ def clean_data(pipeline, signal, signal_rate, witnesses, witness_rate, fft_lengt transfer_functions = mkinterleave(pipeline, numpy.insert(witness_tees, 0, resampled_signal, axis = 0)) if obsready is not None: transfer_functions = mkgate(pipeline, transfer_functions, obsready, 1, attack_length = attack_length) + if(chop_time): + transfer_functions = pipeparts.mkgeneric(pipeline, transfer_functions, "lal_insertgap", chop_length = int(1000000000 * chop_time)) transfer_functions = pipeparts.mkgeneric(pipeline, transfer_functions, "lal_transferfunction", fft_length = fft_length, fft_overlap = fft_overlap, num_ffts = num_ffts, update_samples = update_samples, make_fir_filters = -1, fir_length = fir_length, frequency_resolution = frequency_resolution, high_pass = 9, update_after_gap = True, filename = filename) signal_minus_noise = [signal_tee] for i in range(0, len(witnesses)): - minus_noise = pipeparts.mkgeneric(pipeline, witness_tees[i], "lal_tdwhiten", kernel = default_fir_filter, latency = fir_length / 2, taper_length = 20 * fir_length) + minus_noise = pipeparts.mkgeneric(pipeline, mkqueue(pipeline, witness_tees[i], min_length = wait_time + 0.5), "lal_tdwhiten", kernel = default_fir_filter, latency = fir_length / 2, taper_length = 20 * fir_length) transfer_functions.connect("notify::fir-filters", update_filters, minus_noise, "fir_filters", "kernel", i) signal_minus_noise.append(mkresample(pipeline, minus_noise, 5, False, signal_rate)) -- GitLab