/* * Copyright (C) 2016 Aaron Viets <aaron.viets@ligo.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ /** * SECTION:gstlal_insertgap * @short_description: This element replaces undesired values, specified * by the property #bad_data_intervals, with gaps. * * The primary purpose of this element is to reject certain values * considered by the user to be "bad data." This can be done by flagging * that data as a gap, replacing it with a specified value, or both. The * criteria for bad data is specified by the array property * #bad_data_intervals. Array indices 0, 2, 4, etc., * represent maxima, and array indices 1, 3, 5, etc., represent the * corresponding minima. For example, if * #bad_data_intervals = [0, 1, 2, 3], * then any values that fall in one of the closed intervals [-inf, 0], * [1, 2], or [3, inf] will be rejected and gapped and/or replaced * as specified by the user. To reject a single value, say zero, * #bad_data_intervals should be [-max_double, 0, 0, max_double]. * If the data stream is complex, the real and and imaginary parts of * each input is tested, and if either is bad, the value is rejected. * The #bad_data_intervals and #replace_value properties are applied to * both real and imaginary parts. * This element also has the ability to fill in discontinuities if the * property #fill-discont is set to true. Presentation timestamps and * buffer offsets are adjusted as needed. */ /* * ============================================================================ * * Preamble * * ============================================================================ */ /* * stuff from C */ #include <math.h> #include <string.h> #include <complex.h> /* * stuff from gobject/gstreamer */ #include <glib.h> #include <gst/gst.h> #include <gst/audio/audio.h> /* * our own stuff */ #include <gstlal/gstlal.h> #include <gstlal/gstlal_debug.h> #include <gstlal/gstlal_audio_info.h> #include <gstlal_insertgap.h> /* * ============================================================================ * * GStreamer Boiler Plate * * ============================================================================ */ static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE( GST_BASE_TRANSFORM_SINK_NAME, GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS( "audio/x-raw, " \ "rate = (int) [1, MAX], " \ "channels = (int) [1, MAX], " \ "format = (string) { "GST_AUDIO_NE(U32)", " GST_AUDIO_NE(F32) ", " GST_AUDIO_NE(F64) ", " GST_AUDIO_NE(Z64) ", " GST_AUDIO_NE(Z128) " }, " \ "layout = (string) interleaved, " \ "channel-mask = (bitmask) 0" ) ); static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE( GST_BASE_TRANSFORM_SRC_NAME, GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS( "audio/x-raw, " \ "rate = (int) [1, MAX], " \ "channels = (int) [1, MAX], " \ "format = (string) { "GST_AUDIO_NE(U32)", " GST_AUDIO_NE(F32) ", " GST_AUDIO_NE(F64) ", " GST_AUDIO_NE(Z64) ", " GST_AUDIO_NE(Z128) " }, " \ "layout = (string) interleaved, " \ "channel-mask = (bitmask) 0" ) ); #define GST_CAT_DEFAULT gstlal_insertgap_debug GST_DEBUG_CATEGORY_STATIC(GST_CAT_DEFAULT); G_DEFINE_TYPE_WITH_CODE( GSTLALInsertGap, gstlal_insertgap, GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT(GST_CAT_DEFAULT, "lal_insertgap", 0, "lal_insertgap element") ); /* * ============================================================================ * * Utilities * * ============================================================================ */ #define DEFINE_CHECK_DATA(DTYPE) \ static gboolean check_data_ ## DTYPE(DTYPE *data, double *bad_data_intervals, int array_length, int num_checks, gboolean remove_nan, gboolean remove_inf) { \ int i, j; \ gboolean result = TRUE; \ for(i = 0; i < num_checks; i++) { \ if(bad_data_intervals) { \ for(j = 0; j < array_length; j += 2) \ result &= data[i] > bad_data_intervals[j] && data[i] < bad_data_intervals[j + 1]; \ } \ if(remove_nan) \ result &= !isnan(data[i]); \ if(remove_inf) \ result &= !isinf(data[i]); \ } \ return result; \ } DEFINE_CHECK_DATA(float); DEFINE_CHECK_DATA(double); DEFINE_CHECK_DATA(guint32); #define DEFINE_PROCESS_INBUF(DTYPE,COMPLEX) \ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *indata, DTYPE COMPLEX *outdata, GSTLALInsertGap *element, gboolean sinkbuf_gap, gboolean sinkbuf_discont, guint64 sinkbuf_offset, guint64 sinkbuf_offset_end, GstClockTime sinkbuf_dur, GstClockTime sinkbuf_pts, gboolean complex_data) \ { \ GstFlowReturn result = GST_FLOW_OK; \ guint64 blocks, max_block_length, missing_samples; \ missing_samples = 0; \ \ /* * First, deal with discontinuity if necessary */ \ if(element->fill_discont && (element->last_sinkbuf_offset_end != 0) && (sinkbuf_pts != element->last_sinkbuf_ets)) { \ \ /* Track discont length and number of zero-length buffers */ \ element->discont_time += (sinkbuf_pts - element->last_sinkbuf_ets); \ element->empty_bufs += (sinkbuf_dur ? 0 : 1); \ \ /* Find number of missing samples and max block length in samples */ \ missing_samples = gst_util_uint64_scale_int_round(sinkbuf_pts - element->last_sinkbuf_ets, element->rate, 1000000000); \ blocks = (sinkbuf_pts - element->last_sinkbuf_ets + element->block_duration - 1) / element->block_duration; /* ceil */ \ g_assert_cmpuint(blocks, >, 0); /* make sure that the discont is not zero length */ \ max_block_length = (missing_samples + blocks - 1) / blocks; /* ceil */ \ g_assert_cmpuint(max_block_length, >, 0); \ \ /* Message for debugging */ \ if(sinkbuf_dur && sinkbuf_offset != sinkbuf_offset_end) \ GST_WARNING_OBJECT(element, "filling discontinuity lasting %f seconds (%lu samples) including %lu zero-length buffers and starting at %f seconds (offset %lu)", (((double) element->discont_time) / 1000000000.0), gst_util_uint64_scale_int_round(element->discont_time, element->rate, 1000000000), element->empty_bufs, (double) sinkbuf_pts / 1000000000.0 - (double) element->discont_time / 1000000000.0, sinkbuf_offset); \ \ guint standard_blocks = (guint) (missing_samples / max_block_length); \ guint64 last_block_length = missing_samples % max_block_length; \ DTYPE COMPLEX sample_value; \ if(complex_data) \ sample_value = (element->replace_value < G_MAXDOUBLE) ? ((DTYPE) (element->replace_value)) * (1 + I) : 0; \ else \ sample_value = (element->replace_value < G_MAXDOUBLE) ? ((DTYPE) (element->replace_value)) : 0; \ \ /* first make and push any buffers of size max_buffer_size */ \ if(standard_blocks != 0) { \ guint buffer_num; \ for(buffer_num = 0; buffer_num < standard_blocks; buffer_num++) { \ GstBuffer *discont_buf; \ DTYPE COMPLEX *discont_buf_data; \ discont_buf_data = g_malloc(max_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ guint sample_num; \ for(sample_num = 0; sample_num < max_block_length * element->channels; sample_num++) { \ *discont_buf_data = sample_value; \ discont_buf_data++; \ } \ discont_buf = gst_buffer_new_wrapped((discont_buf_data - max_block_length * element->channels), max_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ if(G_UNLIKELY(!discont_buf)) { \ GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \ result = GST_FLOW_ERROR; \ goto done; \ } \ \ /* set flags, caps, offset, and timestamps. */ \ GST_BUFFER_OFFSET(discont_buf) = element->last_sinkbuf_offset_end + element->discont_offset + buffer_num * max_block_length; \ GST_BUFFER_OFFSET_END(discont_buf) = GST_BUFFER_OFFSET(discont_buf) + max_block_length; \ GST_BUFFER_PTS(discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, (guint64) buffer_num * max_block_length, missing_samples); \ GST_BUFFER_DURATION(discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, ((guint64) buffer_num + 1) * max_block_length, missing_samples) - GST_BUFFER_PTS(discont_buf); \ GST_BUFFER_FLAG_UNSET(discont_buf, GST_BUFFER_FLAG_DISCONT); \ if(element->insert_gap) \ GST_BUFFER_FLAG_SET(discont_buf, GST_BUFFER_FLAG_GAP); \ else \ GST_BUFFER_FLAG_UNSET(discont_buf, GST_BUFFER_FLAG_GAP); \ \ /* push buffer downstream */ \ GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(discont_buf)); \ result = gst_pad_push(element->srcpad, discont_buf); \ if(G_UNLIKELY(result != GST_FLOW_OK)) { \ GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result)); \ goto done; \ } \ } \ } \ \ /* then make and push the remainder buffer */ \ if(last_block_length != 0) { \ GstBuffer *last_discont_buf; \ DTYPE COMPLEX *last_discont_buf_data; \ last_discont_buf_data = g_malloc(last_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ guint sample_num; \ for(sample_num = 0; sample_num < last_block_length * element->channels; sample_num++) { \ *last_discont_buf_data = sample_value; \ last_discont_buf_data++; \ } \ last_discont_buf = gst_buffer_new_wrapped((last_discont_buf_data - last_block_length * element->channels), last_block_length * element->channels * sizeof(DTYPE COMPLEX)); \ if(G_UNLIKELY(!last_discont_buf)) { \ GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \ result = GST_FLOW_ERROR; \ goto done; \ } \ \ /* set flags, caps, offset, and timestamps. */ \ GST_BUFFER_OFFSET(last_discont_buf) = element->last_sinkbuf_offset_end + element->discont_offset + missing_samples - last_block_length; \ GST_BUFFER_OFFSET_END(last_discont_buf) = GST_BUFFER_OFFSET(last_discont_buf) + last_block_length; \ GST_BUFFER_PTS(last_discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, missing_samples - last_block_length, missing_samples); \ GST_BUFFER_DURATION(last_discont_buf) = sinkbuf_pts - GST_BUFFER_PTS(last_discont_buf); \ GST_BUFFER_FLAG_UNSET(last_discont_buf, GST_BUFFER_FLAG_DISCONT); \ if(element->insert_gap) \ GST_BUFFER_FLAG_SET(last_discont_buf, GST_BUFFER_FLAG_GAP); \ else \ GST_BUFFER_FLAG_UNSET(last_discont_buf, GST_BUFFER_FLAG_GAP); \ \ /* push buffer downstream */ \ GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(last_discont_buf)); \ result = gst_pad_push(element->srcpad, last_discont_buf); \ if(G_UNLIKELY(result != GST_FLOW_OK)) { \ GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result)); \ goto done; \ } \ } \ element->discont_offset += missing_samples; \ } \ if(!sinkbuf_dur) \ goto done; \ \ element->discont_time = 0; \ element->empty_bufs = 0; \ \ /* * Now, use data on input buffer to make next output buffer(s) */ \ gboolean data_is_bad, srcbuf_gap, srcbuf_gap_next; \ guint64 offset, current_srcbuf_length; \ current_srcbuf_length = 0; \ \ /* compute length of incoming buffer and maximum block length in samples */ \ blocks = (sinkbuf_dur + element->block_duration - 1) / element->block_duration; /* ceil */ \ g_assert_cmpuint(blocks, >, 0); /* make sure that the sinkbuf is not zero length */ \ guint64 length = sinkbuf_offset_end - sinkbuf_offset; \ max_block_length = (length + blocks - 1) / blocks; /* ceil */ \ g_assert_cmpuint(max_block_length, >, 0); \ \ /* Check first sample */ \ data_is_bad = !check_data_ ## DTYPE((DTYPE *) indata, element->bad_data_intervals, element->array_length, (1 + (int) complex_data) * element->channels, element->remove_nan, element->remove_inf); \ srcbuf_gap = (sinkbuf_gap && (!(element->remove_gap))) || ((element->insert_gap) && data_is_bad); \ for(offset = 0; offset < length; offset++) { \ data_is_bad = !check_data_ ## DTYPE((DTYPE *) indata, element->bad_data_intervals, element->array_length, (1 + (int) complex_data) * element->channels, element->remove_nan, element->remove_inf); \ srcbuf_gap_next = (sinkbuf_gap && (!(element->remove_gap))) || ((element->insert_gap) && data_is_bad); \ if(complex_data) \ *outdata = (((element->replace_value) < G_MAXDOUBLE) && data_is_bad) ? ((DTYPE) (element->replace_value)) * (1 + I) : *indata; \ else \ *outdata = (((element->replace_value) < G_MAXDOUBLE) && data_is_bad) ? (DTYPE) (element->replace_value) : *indata; \ current_srcbuf_length++; \ indata += element->channels; \ outdata += element->channels; \ \ /* * We need to push an output buffer if: * 1) The number of samples to be output equals the maximum output buffer size * 2) We have reached the end of the input buffer * 3) The output data changes from non-gap to gap or vice-versa */ \ if((current_srcbuf_length >= max_block_length) || (offset >= length - 1) || (srcbuf_gap_next != srcbuf_gap)) { \ if((current_srcbuf_length > max_block_length) || (offset > length - 1)) \ g_assert_not_reached(); \ if(srcbuf_gap_next != srcbuf_gap) { \ /* * In this case, we don't want the most recent sample since its * gap state is different. Put it on the next output buffer. */ \ offset--; \ current_srcbuf_length--; \ indata -= element->channels; \ outdata -= element->channels; \ } \ GstBuffer *srcbuf; \ DTYPE COMPLEX *srcbuf_data; \ srcbuf_data = g_malloc(current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \ memcpy(srcbuf_data, (outdata - current_srcbuf_length * element->channels), current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \ srcbuf = gst_buffer_new_wrapped(srcbuf_data, current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \ \ if(G_UNLIKELY(!srcbuf)) { \ GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \ result = GST_FLOW_ERROR; \ goto done; \ } \ \ /* set flags, caps, offset, and timestamps. */ \ GST_BUFFER_OFFSET(srcbuf) = sinkbuf_offset + element->discont_offset + offset + 1 - current_srcbuf_length; \ GST_BUFFER_OFFSET_END(srcbuf) = GST_BUFFER_OFFSET(srcbuf) + current_srcbuf_length; \ GST_BUFFER_PTS(srcbuf) = sinkbuf_pts + gst_util_uint64_scale_int_round(sinkbuf_dur, offset + 1 - current_srcbuf_length, length); \ GST_BUFFER_DURATION(srcbuf) = sinkbuf_pts + gst_util_uint64_scale_int_round(sinkbuf_dur, offset + 1, length) - GST_BUFFER_PTS(srcbuf); \ if(srcbuf_gap) \ GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_GAP); \ else \ GST_BUFFER_FLAG_UNSET(srcbuf, GST_BUFFER_FLAG_GAP); \ \ /* * only the first subbuffer of a buffer flagged as a * discontinuity is a discontinuity. */ \ if(sinkbuf_discont && (offset + 1 - current_srcbuf_length == 0) && ((!(element->fill_discont)) || (element->last_sinkbuf_ets == 0))) \ GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_DISCONT); \ else \ GST_BUFFER_FLAG_UNSET(srcbuf, GST_BUFFER_FLAG_DISCONT); \ if(srcbuf_gap_next != srcbuf_gap) { \ /* We need to reset our place in the input buffer */ \ offset++; \ indata += element->channels; \ outdata += element->channels; \ current_srcbuf_length = 1; \ srcbuf_gap = srcbuf_gap_next; \ } else \ current_srcbuf_length = 0; \ \ /* push buffer downstream */ \ GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(srcbuf)); \ result = gst_pad_push(element->srcpad, srcbuf); \ if(G_UNLIKELY(result != GST_FLOW_OK)) { \ GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result)); \ goto done; \ } \ } \ } \ done: \ element->last_sinkbuf_ets = sinkbuf_pts + sinkbuf_dur; \ element->last_sinkbuf_offset_end = sinkbuf_offset_end ? sinkbuf_offset_end : element->last_sinkbuf_offset_end; \ return result; \ } DEFINE_PROCESS_INBUF(guint32, ) DEFINE_PROCESS_INBUF(float, ) DEFINE_PROCESS_INBUF(double, ) DEFINE_PROCESS_INBUF(float,complex) DEFINE_PROCESS_INBUF(double,complex) /* * ============================================================================ * * Sink Pad * * ============================================================================ */ /* * sink_event() */ static gboolean sink_event(GstPad *pad, GstObject *parent, GstEvent *event) { GSTLALInsertGap *element = GSTLAL_INSERTGAP(parent); gboolean success = TRUE; GST_DEBUG_OBJECT(pad, "Got %s event on sink pad", GST_EVENT_TYPE_NAME(event)); switch(GST_EVENT_TYPE(event)) { case GST_EVENT_CAPS: { GstCaps *caps; GstAudioInfo info; gst_event_parse_caps(event, &caps); success &= gstlal_audio_info_from_caps(&info, caps); GstStructure *str = gst_caps_get_structure(caps, 0); const gchar *name = gst_structure_get_string(str, "format"); success &= (name != NULL); if(success) { /* record stream parameters */ element->rate = GST_AUDIO_INFO_RATE(&info); element->channels = GST_AUDIO_INFO_CHANNELS(&info); element->unit_size = GST_AUDIO_INFO_BPF(&info); if(!strcmp(name, GST_AUDIO_NE(U32))) { element->data_type = GSTLAL_INSERTGAP_U32; g_assert_cmpuint(element->unit_size, ==, 4); } else if(!strcmp(name, GST_AUDIO_NE(F32))) { element->data_type = GSTLAL_INSERTGAP_F32; g_assert_cmpuint(element->unit_size, ==, 4); } else if(!strcmp(name, GST_AUDIO_NE(F64))) { element->data_type = GSTLAL_INSERTGAP_F64; g_assert_cmpuint(element->unit_size, ==, 8); } else if(!strcmp(name, GST_AUDIO_NE(Z64))) { element->data_type = GSTLAL_INSERTGAP_Z64; g_assert_cmpuint(element->unit_size, ==, 8); } else if(!strcmp(name, GST_AUDIO_NE(Z128))) { element->data_type = GSTLAL_INSERTGAP_Z128; g_assert_cmpuint(element->unit_size, ==, 16); } else g_assert_not_reached(); } break; } default: break; } if(!success) { gst_event_unref(event); } else { success = gst_pad_event_default(pad, parent, event); } return success; } /* * chain() */ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) { GSTLALInsertGap *element = GSTLAL_INSERTGAP(parent); GstFlowReturn result = GST_FLOW_OK; GST_DEBUG_OBJECT(element, "received %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(sinkbuf)); if(GST_BUFFER_PTS_IS_VALID(sinkbuf)) { /* Set the timestamp of the first output sample) */ if(element->t0 == GST_CLOCK_TIME_NONE) element->t0 = GST_BUFFER_PTS(sinkbuf) + element->chop_length; /* If we are throwing away any initial data, do it now, and send a zero-length buffer downstream to let other elements know when to expect the first buffer */ if(element->chop_length && GST_BUFFER_PTS(sinkbuf) + GST_BUFFER_DURATION(sinkbuf) <= element->t0) { GstBuffer *srcbuf = gst_buffer_new(); GST_BUFFER_OFFSET(srcbuf) = GST_BUFFER_OFFSET(sinkbuf) + gst_util_uint64_scale_round(element->t0 - GST_BUFFER_PTS(sinkbuf), (guint64) element->rate, 1000000000); GST_BUFFER_OFFSET_END(srcbuf) = GST_BUFFER_OFFSET(srcbuf); GST_BUFFER_PTS(srcbuf) = element->t0; GST_BUFFER_DURATION(srcbuf) = 0; result = gst_pad_push(element->srcpad, srcbuf); gst_buffer_unref(sinkbuf); goto done; } else if(GST_BUFFER_PTS(sinkbuf) < element->t0) { guint64 size_removed = element->unit_size * gst_util_uint64_scale_round(element->t0 - GST_BUFFER_PTS(sinkbuf), (guint64) element->rate, 1000000000); guint64 time_removed = gst_util_uint64_scale_round(size_removed / element->unit_size, 1000000000, (guint64) element->rate); guint64 newsize = element->unit_size * (GST_BUFFER_OFFSET_END(sinkbuf) - GST_BUFFER_OFFSET(sinkbuf)) - size_removed; gst_buffer_resize(sinkbuf, size_removed, newsize); GST_BUFFER_OFFSET(sinkbuf) = GST_BUFFER_OFFSET(sinkbuf) + size_removed / element->unit_size; GST_BUFFER_PTS(sinkbuf) = GST_BUFFER_PTS(sinkbuf) + time_removed; GST_BUFFER_DURATION(sinkbuf) = GST_BUFFER_DURATION(sinkbuf) - time_removed; } } /* if buffer does not possess valid metadata or is zero length and we are not filling in discontinuities, push gap downstream */ if(!(GST_BUFFER_PTS_IS_VALID(sinkbuf) && GST_BUFFER_DURATION_IS_VALID(sinkbuf) && GST_BUFFER_OFFSET_IS_VALID(sinkbuf) && GST_BUFFER_OFFSET_END_IS_VALID(sinkbuf)) || (!element->fill_discont && (GST_BUFFER_DURATION(sinkbuf) == 0 || GST_BUFFER_OFFSET(sinkbuf) == GST_BUFFER_OFFSET_END(sinkbuf)))) { GST_DEBUG_OBJECT(element, "pushing gap buffer at timestamp %lu seconds", (long unsigned) GST_TIME_AS_SECONDS(GST_BUFFER_PTS(sinkbuf))); GstBuffer *srcbuf; srcbuf = gst_buffer_copy(sinkbuf); GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_GAP); result = gst_pad_push(element->srcpad, srcbuf); if(G_UNLIKELY(result != GST_FLOW_OK)) GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result)); goto done; } /* if buffer is zero length and we are filling in discontinuities, fill it in, unless it has no valid timestamp. */ if(element->fill_discont && (GST_BUFFER_DURATION(sinkbuf) == 0 || GST_BUFFER_OFFSET(sinkbuf) == GST_BUFFER_OFFSET_END(sinkbuf))) { if(GST_BUFFER_PTS_IS_VALID(sinkbuf) && GST_BUFFER_PTS(sinkbuf) > element->last_sinkbuf_ets) { switch(element->data_type) { case GSTLAL_INSERTGAP_U32: result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE); break; case GSTLAL_INSERTGAP_F32: result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE); break; case GSTLAL_INSERTGAP_F64: result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE); break; case GSTLAL_INSERTGAP_Z64: result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE); break; case GSTLAL_INSERTGAP_Z128: result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE); break; default: g_assert_not_reached(); } } else { GST_DEBUG_OBJECT(element, "dropping zero length buffer at timestamp %lu seconds", (long unsigned) GST_TIME_AS_SECONDS(GST_BUFFER_PTS(sinkbuf))); gst_buffer_unref(sinkbuf); } goto done; } GstMapInfo inmap; gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ); /* We'll need these to decide gaps, offsets, and timestamps on the outgoing buffer(s) */ gboolean sinkbuf_gap = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_GAP); gboolean sinkbuf_discont = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_DISCONT); guint64 sinkbuf_offset = GST_BUFFER_OFFSET(sinkbuf); guint64 sinkbuf_offset_end = GST_BUFFER_OFFSET_END(sinkbuf); GstClockTime sinkbuf_dur = GST_BUFFER_DURATION(sinkbuf); GstClockTime sinkbuf_pts = GST_BUFFER_PTS(sinkbuf); g_assert_cmpuint(inmap.size % element->unit_size, ==, 0); if(!element->chop_length || sinkbuf_pts > element->t0) g_assert_cmpuint((sinkbuf_offset_end - sinkbuf_offset), ==, inmap.size / element->unit_size); /* sanity checks */ /* outdata will be filled with the data that goes on the outgoing buffer(s) */ void *outdata; outdata = g_malloc((sinkbuf_offset_end - sinkbuf_offset) * element->unit_size); switch(element->data_type) { case GSTLAL_INSERTGAP_U32: result = process_inbuf_guint32((guint32 *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); break; case GSTLAL_INSERTGAP_F32: result = process_inbuf_float((float *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); break; case GSTLAL_INSERTGAP_F64: result = process_inbuf_double((double *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); break; case GSTLAL_INSERTGAP_Z64: result = process_inbuf_floatcomplex((float complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); break; case GSTLAL_INSERTGAP_Z128: result = process_inbuf_doublecomplex((double complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); break; default: g_assert_not_reached(); } g_free(outdata); outdata = NULL; gst_buffer_unmap(sinkbuf, &inmap); gst_buffer_unref(sinkbuf); /* * done */ done: return result; } /* * ============================================================================ * * GObject Method Overrides * * ============================================================================ */ /* * properties */ enum property { ARG_INSERT_GAP = 1, ARG_REMOVE_GAP, ARG_REMOVE_NAN, ARG_REMOVE_INF, ARG_FILL_DISCONT, ARG_REPLACE_VALUE, ARG_BAD_DATA_INTERVALS, ARG_BLOCK_DURATION, ARG_CHOP_LENGTH }; static void set_property(GObject *object, enum property prop_id, const GValue *value, GParamSpec *pspec) { GSTLALInsertGap *element = GSTLAL_INSERTGAP(object); GST_OBJECT_LOCK(element); switch (prop_id) { case ARG_INSERT_GAP: element->insert_gap = g_value_get_boolean(value); break; case ARG_REMOVE_GAP: element->remove_gap = g_value_get_boolean(value); break; case ARG_REMOVE_NAN: element->remove_nan = g_value_get_boolean(value); break; case ARG_REMOVE_INF: element->remove_inf = g_value_get_boolean(value); break; case ARG_FILL_DISCONT: element->fill_discont = g_value_get_boolean(value); break; case ARG_REPLACE_VALUE: element->replace_value = g_value_get_double(value); break; case ARG_BAD_DATA_INTERVALS: { if(element->bad_data_intervals) { g_free(element->bad_data_intervals); element->bad_data_intervals = NULL; } element->bad_data_intervals = gstlal_doubles_from_g_value_array(g_value_get_boxed(value), NULL, &element->array_length); if(element->array_length % 2) GST_ERROR_OBJECT(element, "Array length for property bad_data_intervals must be even"); break; } case ARG_BLOCK_DURATION: element->block_duration = g_value_get_uint64(value); break; case ARG_CHOP_LENGTH: element->chop_length = g_value_get_uint64(value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } GST_OBJECT_UNLOCK(element); } static void get_property(GObject *object, enum property prop_id, GValue *value, GParamSpec *pspec) { GSTLALInsertGap *element = GSTLAL_INSERTGAP(object); GST_OBJECT_LOCK(element); switch (prop_id) { case ARG_INSERT_GAP: g_value_set_boolean(value, element->insert_gap); break; case ARG_REMOVE_GAP: g_value_set_boolean(value, element->remove_gap); break; case ARG_REMOVE_NAN: g_value_set_boolean(value, element->remove_nan); break; case ARG_REMOVE_INF: g_value_set_boolean(value, element->remove_inf); break; case ARG_FILL_DISCONT: g_value_set_boolean(value, element->fill_discont); break; case ARG_REPLACE_VALUE: g_value_set_double(value, element->replace_value); break; case ARG_BAD_DATA_INTERVALS: g_value_take_boxed(value, gstlal_g_value_array_from_doubles(element->bad_data_intervals, element->array_length)); break; case ARG_BLOCK_DURATION: g_value_set_uint64(value, element->block_duration); break; case ARG_CHOP_LENGTH: g_value_set_uint64(value, element->chop_length); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } GST_OBJECT_UNLOCK(element); } /* * finalize() */ static void finalize(GObject *object) { GSTLALInsertGap *element = GSTLAL_INSERTGAP(object); gst_object_unref(element->sinkpad); element->sinkpad = NULL; gst_object_unref(element->srcpad); element->srcpad = NULL; g_free(element->bad_data_intervals); element->bad_data_intervals = NULL; G_OBJECT_CLASS(gstlal_insertgap_parent_class)->finalize(object); } /* * class_init() */ static void gstlal_insertgap_class_init(GSTLALInsertGapClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS(klass); GstElementClass *element_class = GST_ELEMENT_CLASS(klass); gst_element_class_set_details_simple( element_class, "Replace unwanted data with gaps", "Filter", "Replace unwanted data, specified with the property bad-data-intervals, with gaps.\n\t\t\t " "Also can replace with another value, given by replace-value. Can also remove gaps\n\t\t\t " "where data is acceptable, and fill in discontinuities if desired.", "Aaron Viets <aaron.viets@ligo.org>" ); gobject_class->set_property = GST_DEBUG_FUNCPTR(set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR(get_property); gobject_class->finalize = GST_DEBUG_FUNCPTR(finalize); gst_element_class_add_pad_template(element_class, gst_static_pad_template_get(&src_factory)); gst_element_class_add_pad_template(element_class, gst_static_pad_template_get(&sink_factory)); g_object_class_install_property( gobject_class, ARG_INSERT_GAP, g_param_spec_boolean( "insert-gap", "Insert gap", "If set to true (default), any data fitting the criteria specified by the property\n\t\t\t" "bad-data-intervals is replaced with gaps. Also, NaN's and inf's are replaced with\n\t\t\t" "gaps if the properties remove-nan and remove-inf are set to true, respectively.", TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); g_object_class_install_property( gobject_class, ARG_REMOVE_GAP, g_param_spec_boolean( "remove-gap", "Remove gap", "If set to true, any data in an input gap buffer that does not fit the criteria\n\t\t\t" "specified by the property bad-data-intervals will be marked as non-gap. If the\n\t\t\t" "property insert-gap is false and remove-gap is true, gaps with unacceptable\n\t\t\t" "data will be replaced by the value specified by the property replace-value.", FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); g_object_class_install_property( gobject_class, ARG_REMOVE_NAN, g_param_spec_boolean( "remove-nan", "Remove NaN", "If set to true (default), NaN's in the data stream will be replaced with gaps\n\t\t\t" "and/or the replace-value, as specified by user.", TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); g_object_class_install_property( gobject_class, ARG_REMOVE_INF, g_param_spec_boolean( "remove-inf", "Remove inf", "If set to true (default), infinities in the data stream will be replaced with\n\t\t\t" "gaps and/or the replace-value, as specified by user.", TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); g_object_class_install_property( gobject_class, ARG_FILL_DISCONT, g_param_spec_boolean( "fill-discont", "Fill discontinuity", "If set to true, discontinuities in the data stream will be filled with the\n\t\t\t" "replace-value (if set, otherwise 0), and gapped if insert-gap is true.", FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); g_object_class_install_property( gobject_class, ARG_REPLACE_VALUE, g_param_spec_double( "replace-value", "Replace value", "If set, this value is used to replace any data that fits the criteria\n\t\t\t" "specified by the property bad-data-intervals. If unset, values are not replaced.", -G_MAXDOUBLE, G_MAXDOUBLE, G_MAXDOUBLE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); g_object_class_install_property( gobject_class, ARG_BAD_DATA_INTERVALS, g_param_spec_value_array( "bad-data-intervals", "Bad data intervals", "Array containing minima and maxima of closed intervals in which data is\n\t\t\t" "considered unacceptable and will be replaced with gaps and/or the replace-value.\n\t\t\t" "Array indices 0, 2, 4, etc., represent maxima, and array indices 1, 3, 5, etc.,\n\t\t\t" "represent the corresponding minima.", g_param_spec_double( "coefficient", "Coefficient", "Coefficient", -G_MAXDOUBLE, G_MAXDOUBLE, 0.0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS ), G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS ) ); g_object_class_install_property( gobject_class, ARG_BLOCK_DURATION, g_param_spec_uint64( "block-duration", "Block duration", "Maximum output buffer duration in nanoseconds. Buffers may be smaller than this.\n\t\t\t" "Default is to not change buffer length except as required by added/removed gaps.", 0, G_MAXUINT64, G_MAXUINT64 / 2, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); g_object_class_install_property( gobject_class, ARG_CHOP_LENGTH, g_param_spec_uint64( "chop-length", "Chop length", "Amount of initial data to throw away before producing output data, in nanoseconds.", 0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); } /* * instance init */ static void gstlal_insertgap_init(GSTLALInsertGap *element) { GstPad *pad; gst_element_create_all_pads(GST_ELEMENT(element)); /* configure (and ref) sink pad */ pad = gst_element_get_static_pad(GST_ELEMENT(element), "sink"); gst_pad_set_event_function(pad, GST_DEBUG_FUNCPTR(sink_event)); gst_pad_set_chain_function(pad, GST_DEBUG_FUNCPTR(chain)); GST_PAD_SET_PROXY_CAPS(pad); GST_PAD_SET_PROXY_ALLOCATION(pad); GST_PAD_SET_PROXY_SCHEDULING(pad); element->sinkpad = pad; /* retrieve (and ref) src pad */ pad = gst_element_get_static_pad(GST_ELEMENT(element), "src"); GST_PAD_SET_PROXY_CAPS(pad); GST_PAD_SET_PROXY_ALLOCATION(pad); GST_PAD_SET_PROXY_SCHEDULING(pad); element->srcpad = pad; /* internal data */ element->t0 = GST_CLOCK_TIME_NONE; element->bad_data_intervals = NULL; element->array_length = 0; element->rate = 0; element->channels = 0; element->unit_size = 0; element->last_sinkbuf_ets = 0; element->last_sinkbuf_offset_end = 0; element->discont_offset = 0; element->discont_time = 0; element->empty_bufs = 0; }