Skip to content
Snippets Groups Projects
Forked from lscsoft / GstLAL
3058 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_insertgap.c 33.82 KiB
/*
 * Copyright (C) 2016 Aaron Viets <aaron.viets@ligo.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */


/**
 * SECTION:gstlal_insertgap
 * @short_description: This element replaces undesired values, specified
 * by the property #bad_data_intervals, with gaps.
 *
 * The primary purpose of this element is to reject certain values
 * considered by the user to be "bad data." This can be done by flagging
 * that data as a gap, replacing it with a specified value, or both. The
 * criteria for bad data is specified by the array property
 * #bad_data_intervals. Array indices 0, 2, 4, etc.,
 * represent maxima, and array indices 1, 3, 5, etc., represent the 
 * corresponding minima. For example, if 
 * #bad_data_intervals = [0, 1, 2, 3],
 * then any values that fall in one of the closed intervals [-inf, 0],
 * [1, 2], or [3, inf] will be rejected and gapped and/or replaced
 * as specified by the user. To reject a single value, say zero,
 * #bad_data_intervals should be [-max_double, 0, 0, max_double].
 * If the data stream is complex, the real and and imaginary parts of
 * each input is tested, and if either is bad, the value is rejected.
 * The #bad_data_intervals and #replace_value properties are applied to
 * both real and imaginary parts.
 * This element also has the ability to fill in discontinuities if the
 * property #fill-discont is set to true. Presentation timestamps and
 * buffer offsets are adjusted as needed.
 */


/*
 * ============================================================================
 *
 *				  Preamble
 *
 * ============================================================================
 */


/*
 * stuff from C
 */


#include <math.h>
#include <string.h>
#include <complex.h>


/*
 * stuff from gobject/gstreamer
 */

#include <glib.h>
#include <gst/gst.h>
#include <gst/audio/audio.h>


/*
 * our own stuff
 */


#include <gstlal/gstlal.h>
#include <gstlal/gstlal_debug.h>
#include <gstlal/gstlal_audio_info.h>
#include <gstlal_insertgap.h>


/*
 * ============================================================================
 *
 *			   GStreamer Boiler Plate
 *
 * ============================================================================
 */


static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE(
	GST_BASE_TRANSFORM_SINK_NAME,
	GST_PAD_SINK,
	GST_PAD_ALWAYS,
	GST_STATIC_CAPS(
		"audio/x-raw, " \
		"rate = (int) [1, MAX], " \
		"channels = (int) [1, MAX], " \
		"format = (string) { "GST_AUDIO_NE(U32)", " GST_AUDIO_NE(F32) ", " GST_AUDIO_NE(F64) ", " GST_AUDIO_NE(Z64) ", " GST_AUDIO_NE(Z128) " }, " \
		"layout = (string) interleaved, " \
		"channel-mask = (bitmask) 0"
	)
);


static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE(
	GST_BASE_TRANSFORM_SRC_NAME,
	GST_PAD_SRC,
	GST_PAD_ALWAYS,
	GST_STATIC_CAPS(
		"audio/x-raw, " \
		"rate = (int) [1, MAX], " \
		"channels = (int) [1, MAX], " \
		"format = (string) { "GST_AUDIO_NE(U32)", " GST_AUDIO_NE(F32) ", " GST_AUDIO_NE(F64) ", " GST_AUDIO_NE(Z64) ", " GST_AUDIO_NE(Z128) " }, " \
		"layout = (string) interleaved, " \
		"channel-mask = (bitmask) 0"
	)
);


#define GST_CAT_DEFAULT gstlal_insertgap_debug
GST_DEBUG_CATEGORY_STATIC(GST_CAT_DEFAULT);


G_DEFINE_TYPE_WITH_CODE(
	GSTLALInsertGap,
	gstlal_insertgap,
	GST_TYPE_ELEMENT,
	GST_DEBUG_CATEGORY_INIT(GST_CAT_DEFAULT, "lal_insertgap", 0, "lal_insertgap element")
);


/*
 * ============================================================================
 *
 *			       Utilities
 *
 * ============================================================================
 */


#define DEFINE_CHECK_DATA(DTYPE) \
static gboolean check_data_ ## DTYPE(DTYPE *data, double *bad_data_intervals, int array_length, int num_checks, gboolean remove_nan, gboolean remove_inf) { \
	int i, j; \
	gboolean result = TRUE; \
	for(i = 0; i < num_checks; i++) { \
		if(bad_data_intervals) { \
			for(j = 0; j < array_length; j += 2) \
				result &= data[i] > bad_data_intervals[j] && data[i] < bad_data_intervals[j + 1]; \
		} \
		if(remove_nan) \
			result &= !isnan(data[i]); \
		if(remove_inf) \
			result &= !isinf(data[i]); \
	} \
	return result; \
}


DEFINE_CHECK_DATA(float);
DEFINE_CHECK_DATA(double);
DEFINE_CHECK_DATA(guint32);


#define DEFINE_PROCESS_INBUF(DTYPE,COMPLEX) \
static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *indata, DTYPE COMPLEX *outdata, GSTLALInsertGap *element, gboolean sinkbuf_gap, gboolean sinkbuf_discont, guint64 sinkbuf_offset, guint64 sinkbuf_offset_end, GstClockTime sinkbuf_dur, GstClockTime sinkbuf_pts, gboolean complex_data) \
{ \
	GstFlowReturn result = GST_FLOW_OK; \
	guint64 blocks, max_block_length, missing_samples; \
	missing_samples = 0; \
 \
	/*
	 * First, deal with discontinuity if necessary
	 */ \
	if(element->fill_discont && (element->last_sinkbuf_offset_end != 0) && (sinkbuf_pts != element->last_sinkbuf_ets)) { \
 \
		/* Track discont length and number of zero-length buffers */ \
		element->discont_time += (sinkbuf_pts - element->last_sinkbuf_ets); \
		element->empty_bufs += (sinkbuf_dur ? 0 : 1); \
 \
		/* Find number of missing samples and max block length in samples */ \
		missing_samples = gst_util_uint64_scale_int_round(sinkbuf_pts - element->last_sinkbuf_ets, element->rate, 1000000000); \
		blocks = (sinkbuf_pts - element->last_sinkbuf_ets + element->block_duration - 1) / element->block_duration; /* ceil */ \
		g_assert_cmpuint(blocks, >, 0); /* make sure that the discont is not zero length */ \
		max_block_length = (missing_samples + blocks - 1) / blocks; /* ceil */ \
		g_assert_cmpuint(max_block_length, >, 0); \
 \
		/* Message for debugging */ \
		if(sinkbuf_dur && sinkbuf_offset != sinkbuf_offset_end) \
			GST_WARNING_OBJECT(element, "filling discontinuity lasting %f seconds (%lu samples) including %lu zero-length buffers and starting at %f seconds (offset %lu)", (((double) element->discont_time) / 1000000000.0), gst_util_uint64_scale_int_round(element->discont_time, element->rate, 1000000000), element->empty_bufs, (double) sinkbuf_pts / 1000000000.0 - (double) element->discont_time / 1000000000.0, sinkbuf_offset); \
 \
		guint standard_blocks = (guint) (missing_samples / max_block_length); \
		guint64 last_block_length = missing_samples % max_block_length; \
		DTYPE COMPLEX sample_value; \
		if(complex_data) \
			sample_value = (element->replace_value < G_MAXDOUBLE) ? ((DTYPE) (element->replace_value)) * (1 + I) : 0; \
		else \
			sample_value = (element->replace_value < G_MAXDOUBLE) ? ((DTYPE) (element->replace_value)) : 0; \
 \
		/* first make and push any buffers of size max_buffer_size */ \
		if(standard_blocks != 0) { \
			guint buffer_num; \
			for(buffer_num = 0; buffer_num < standard_blocks; buffer_num++) { \
				GstBuffer *discont_buf; \
				DTYPE COMPLEX *discont_buf_data; \
				discont_buf_data = g_malloc(max_block_length * element->channels * sizeof(DTYPE COMPLEX)); \
				guint sample_num; \
				for(sample_num = 0; sample_num < max_block_length * element->channels; sample_num++) { \
					*discont_buf_data = sample_value; \
					discont_buf_data++; \
				} \
				discont_buf = gst_buffer_new_wrapped((discont_buf_data - max_block_length * element->channels), max_block_length * element->channels * sizeof(DTYPE COMPLEX)); \
				if(G_UNLIKELY(!discont_buf)) { \
					GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \
					result = GST_FLOW_ERROR; \
					goto done; \
				} \
 \
				/* set flags, caps, offset, and timestamps. */ \
				GST_BUFFER_OFFSET(discont_buf) = element->last_sinkbuf_offset_end + element->discont_offset + buffer_num * max_block_length; \
				GST_BUFFER_OFFSET_END(discont_buf) = GST_BUFFER_OFFSET(discont_buf) + max_block_length; \
				GST_BUFFER_PTS(discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, (guint64) buffer_num * max_block_length, missing_samples); \
				GST_BUFFER_DURATION(discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, ((guint64) buffer_num + 1) * max_block_length, missing_samples) - GST_BUFFER_PTS(discont_buf); \
				GST_BUFFER_FLAG_UNSET(discont_buf, GST_BUFFER_FLAG_DISCONT); \
				if(element->insert_gap) \
					GST_BUFFER_FLAG_SET(discont_buf, GST_BUFFER_FLAG_GAP); \
				else \
					GST_BUFFER_FLAG_UNSET(discont_buf, GST_BUFFER_FLAG_GAP); \
 \
				/* push buffer downstream */ \
				GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(discont_buf)); \
				result = gst_pad_push(element->srcpad, discont_buf); \
				if(G_UNLIKELY(result != GST_FLOW_OK)) { \
					GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result)); \
					goto done; \
				} \
			} \
		} \
 \
		/* then make and push the remainder buffer */ \
		if(last_block_length != 0) { \
			GstBuffer *last_discont_buf; \
			DTYPE COMPLEX *last_discont_buf_data; \
			last_discont_buf_data = g_malloc(last_block_length * element->channels * sizeof(DTYPE COMPLEX)); \
			guint sample_num; \
			for(sample_num = 0; sample_num < last_block_length * element->channels; sample_num++) { \
				*last_discont_buf_data = sample_value; \
				last_discont_buf_data++; \
			} \
			last_discont_buf = gst_buffer_new_wrapped((last_discont_buf_data - last_block_length * element->channels), last_block_length * element->channels * sizeof(DTYPE COMPLEX)); \
			if(G_UNLIKELY(!last_discont_buf)) { \
				GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \
				result = GST_FLOW_ERROR; \
				goto done; \
			} \
 \
			/* set flags, caps, offset, and timestamps. */ \
			GST_BUFFER_OFFSET(last_discont_buf) = element->last_sinkbuf_offset_end + element->discont_offset + missing_samples - last_block_length; \
			GST_BUFFER_OFFSET_END(last_discont_buf) = GST_BUFFER_OFFSET(last_discont_buf) + last_block_length; \
			GST_BUFFER_PTS(last_discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, missing_samples - last_block_length, missing_samples); \
			GST_BUFFER_DURATION(last_discont_buf) = sinkbuf_pts - GST_BUFFER_PTS(last_discont_buf); \
			GST_BUFFER_FLAG_UNSET(last_discont_buf, GST_BUFFER_FLAG_DISCONT); \
			if(element->insert_gap) \
				GST_BUFFER_FLAG_SET(last_discont_buf, GST_BUFFER_FLAG_GAP); \
			else \
				GST_BUFFER_FLAG_UNSET(last_discont_buf, GST_BUFFER_FLAG_GAP); \
 \
			/* push buffer downstream */ \
			GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(last_discont_buf)); \
			result = gst_pad_push(element->srcpad, last_discont_buf); \
			if(G_UNLIKELY(result != GST_FLOW_OK)) { \
				GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result)); \
				goto done; \
			} \
		} \
		element->discont_offset += missing_samples; \
	} \
	if(!sinkbuf_dur) \
		goto done; \
 \
	element->discont_time = 0; \
	element->empty_bufs = 0; \
 \
	/*
	 * Now, use data on input buffer to make next output buffer(s)
	 */ \
	gboolean data_is_bad, srcbuf_gap, srcbuf_gap_next; \
	guint64 offset, current_srcbuf_length; \
	current_srcbuf_length = 0; \
 \
	/* compute length of incoming buffer and maximum block length in samples */ \
	blocks = (sinkbuf_dur + element->block_duration - 1) / element->block_duration; /* ceil */ \
	g_assert_cmpuint(blocks, >, 0); /* make sure that the sinkbuf is not zero length */ \
	guint64 length = sinkbuf_offset_end - sinkbuf_offset; \
	max_block_length = (length + blocks - 1) / blocks; /* ceil */ \
	g_assert_cmpuint(max_block_length, >, 0); \
 \
	/* Check first sample */ \
	data_is_bad = !check_data_ ## DTYPE((DTYPE *) indata, element->bad_data_intervals, element->array_length, (1 + (int) complex_data) * element->channels, element->remove_nan, element->remove_inf); \
	srcbuf_gap = (sinkbuf_gap && (!(element->remove_gap))) || ((element->insert_gap) && data_is_bad); \
	for(offset = 0; offset < length; offset++) { \
		data_is_bad = !check_data_ ## DTYPE((DTYPE *) indata, element->bad_data_intervals, element->array_length, (1 + (int) complex_data) * element->channels, element->remove_nan, element->remove_inf); \
		srcbuf_gap_next = (sinkbuf_gap && (!(element->remove_gap))) || ((element->insert_gap) && data_is_bad); \
		if(complex_data) \
			*outdata = (((element->replace_value) < G_MAXDOUBLE) && data_is_bad) ? ((DTYPE) (element->replace_value)) * (1 + I) : *indata; \
		else \
			*outdata = (((element->replace_value) < G_MAXDOUBLE) && data_is_bad) ? (DTYPE) (element->replace_value) : *indata; \
		current_srcbuf_length++; \
		indata += element->channels; \
		outdata += element->channels; \
 \
		/* 
		 * We need to push an output buffer if:
		 * 1) The number of samples to be output equals the maximum output buffer size
		 * 2) We have reached the end of the input buffer 
		 * 3) The output data changes from non-gap to gap or vice-versa
		 */ \
		if((current_srcbuf_length >= max_block_length) || (offset >= length - 1) || (srcbuf_gap_next != srcbuf_gap)) { \
			if((current_srcbuf_length > max_block_length) || (offset > length - 1)) \
				g_assert_not_reached(); \
			if(srcbuf_gap_next != srcbuf_gap) { \
				/*
				 * In this case, we don't want the most recent sample since its
				 * gap state is different. Put it on the next output buffer.
				 */ \
				offset--; \
				current_srcbuf_length--; \
				indata -= element->channels; \
				outdata -= element->channels; \
			} \
			GstBuffer *srcbuf; \
			DTYPE COMPLEX *srcbuf_data; \
			srcbuf_data = g_malloc(current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \
			memcpy(srcbuf_data, (outdata - current_srcbuf_length * element->channels), current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \
			srcbuf = gst_buffer_new_wrapped(srcbuf_data, current_srcbuf_length * element->channels * sizeof(DTYPE COMPLEX)); \
 \
			if(G_UNLIKELY(!srcbuf)) { \
				GST_ERROR_OBJECT(element, "failure creating sub-buffer"); \
				result = GST_FLOW_ERROR; \
				goto done; \
			} \
 \
			/* set flags, caps, offset, and timestamps. */ \
			GST_BUFFER_OFFSET(srcbuf) = sinkbuf_offset + element->discont_offset + offset + 1 - current_srcbuf_length; \
			GST_BUFFER_OFFSET_END(srcbuf) = GST_BUFFER_OFFSET(srcbuf) + current_srcbuf_length; \
			GST_BUFFER_PTS(srcbuf) = sinkbuf_pts + gst_util_uint64_scale_int_round(sinkbuf_dur, offset + 1 - current_srcbuf_length, length); \
			GST_BUFFER_DURATION(srcbuf) = sinkbuf_pts + gst_util_uint64_scale_int_round(sinkbuf_dur, offset + 1, length) - GST_BUFFER_PTS(srcbuf); \
			if(srcbuf_gap) \
				GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_GAP); \
			else \
				GST_BUFFER_FLAG_UNSET(srcbuf, GST_BUFFER_FLAG_GAP); \
 \
			/*
			 * only the first subbuffer of a buffer flagged as a
			 * discontinuity is a discontinuity.
			 */ \
			if(sinkbuf_discont && (offset + 1 - current_srcbuf_length == 0) && ((!(element->fill_discont)) || (element->last_sinkbuf_ets == 0))) \
				GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_DISCONT); \
			else \
				GST_BUFFER_FLAG_UNSET(srcbuf, GST_BUFFER_FLAG_DISCONT); \
			if(srcbuf_gap_next != srcbuf_gap) { \
				/* We need to reset our place in the input buffer */ \
				offset++; \
				indata += element->channels; \
				outdata += element->channels; \
				current_srcbuf_length = 1; \
				srcbuf_gap = srcbuf_gap_next; \
			} else \
				current_srcbuf_length = 0; \
 \
			/* push buffer downstream */ \
			GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(srcbuf)); \
			result = gst_pad_push(element->srcpad, srcbuf); \
			if(G_UNLIKELY(result != GST_FLOW_OK)) { \
				GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result)); \
				goto done; \
			} \
		} \
	} \
done: \
	element->last_sinkbuf_ets = sinkbuf_pts + sinkbuf_dur; \
	element->last_sinkbuf_offset_end = sinkbuf_offset_end ? sinkbuf_offset_end : element->last_sinkbuf_offset_end; \
	return result; \
}


DEFINE_PROCESS_INBUF(guint32, )
DEFINE_PROCESS_INBUF(float, )
DEFINE_PROCESS_INBUF(double, )
DEFINE_PROCESS_INBUF(float,complex)
DEFINE_PROCESS_INBUF(double,complex)


/*
 * ============================================================================
 *
 *				     Sink Pad
 *
 * ============================================================================
 */


/*
 * sink_event()
 */


static gboolean sink_event(GstPad *pad, GstObject *parent, GstEvent *event)
{
	GSTLALInsertGap *element = GSTLAL_INSERTGAP(parent);
	gboolean success = TRUE;
	GST_DEBUG_OBJECT(pad, "Got %s event on sink pad", GST_EVENT_TYPE_NAME(event));

	switch(GST_EVENT_TYPE(event)) {
	case GST_EVENT_CAPS: {
		GstCaps *caps;
		GstAudioInfo info;
		gst_event_parse_caps(event, &caps);
		success &= gstlal_audio_info_from_caps(&info, caps);
		GstStructure *str = gst_caps_get_structure(caps, 0);
		const gchar *name = gst_structure_get_string(str, "format");
		success &= (name != NULL);
		if(success) {
			/* record stream parameters */
			element->rate = GST_AUDIO_INFO_RATE(&info);
			element->channels = GST_AUDIO_INFO_CHANNELS(&info);
			element->unit_size = GST_AUDIO_INFO_BPF(&info);
			if(!strcmp(name, GST_AUDIO_NE(U32))) {
				element->data_type = GSTLAL_INSERTGAP_U32;
				g_assert_cmpuint(element->unit_size, ==, 4);
			} else if(!strcmp(name, GST_AUDIO_NE(F32))) {
				element->data_type = GSTLAL_INSERTGAP_F32;
				g_assert_cmpuint(element->unit_size, ==, 4);
			} else if(!strcmp(name, GST_AUDIO_NE(F64))) {
				element->data_type = GSTLAL_INSERTGAP_F64;
				g_assert_cmpuint(element->unit_size, ==, 8);
			} else if(!strcmp(name, GST_AUDIO_NE(Z64))) {
				element->data_type = GSTLAL_INSERTGAP_Z64;
				g_assert_cmpuint(element->unit_size, ==, 8);
			} else if(!strcmp(name, GST_AUDIO_NE(Z128))) {
				element->data_type = GSTLAL_INSERTGAP_Z128;
				g_assert_cmpuint(element->unit_size, ==, 16);
			} else
				g_assert_not_reached();
		}
		break;
	}

	default:
		break;
	}

	if(!success) {
		gst_event_unref(event);
	} else {
		success = gst_pad_event_default(pad, parent, event);
	}
	return success;
}


/*
 * chain()
 */


static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
{
	GSTLALInsertGap *element = GSTLAL_INSERTGAP(parent);
	GstFlowReturn result = GST_FLOW_OK;
	GST_DEBUG_OBJECT(element, "received %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(sinkbuf));

	if(GST_BUFFER_PTS_IS_VALID(sinkbuf)) {
		/* Set the timestamp of the first output sample) */
		if(element->t0 == GST_CLOCK_TIME_NONE)
			element->t0 = GST_BUFFER_PTS(sinkbuf) + element->chop_length;

		/* If we are throwing away any initial data, do it now, and send a zero-length buffer downstream to let other elements know when to expect the first buffer */
		if(element->chop_length && GST_BUFFER_PTS(sinkbuf) + GST_BUFFER_DURATION(sinkbuf) <= element->t0) {
			GstBuffer *srcbuf = gst_buffer_new();
			GST_BUFFER_OFFSET(srcbuf) = GST_BUFFER_OFFSET(sinkbuf) + gst_util_uint64_scale_round(element->t0 - GST_BUFFER_PTS(sinkbuf), (guint64) element->rate, 1000000000);
			GST_BUFFER_OFFSET_END(srcbuf) = GST_BUFFER_OFFSET(srcbuf);
			GST_BUFFER_PTS(srcbuf) = element->t0;
			GST_BUFFER_DURATION(srcbuf) = 0;
			result = gst_pad_push(element->srcpad, srcbuf);
			gst_buffer_unref(sinkbuf);
			goto done;
		} else if(GST_BUFFER_PTS(sinkbuf) < element->t0) {
			guint64 size_removed = element->unit_size * gst_util_uint64_scale_round(element->t0 - GST_BUFFER_PTS(sinkbuf), (guint64) element->rate, 1000000000);
			guint64 time_removed = gst_util_uint64_scale_round(size_removed / element->unit_size, 1000000000, (guint64) element->rate);
			guint64 newsize = element->unit_size * (GST_BUFFER_OFFSET_END(sinkbuf) - GST_BUFFER_OFFSET(sinkbuf)) - size_removed;
			gst_buffer_resize(sinkbuf, size_removed, newsize);
			GST_BUFFER_OFFSET(sinkbuf) = GST_BUFFER_OFFSET(sinkbuf) + size_removed / element->unit_size;
			GST_BUFFER_PTS(sinkbuf) = GST_BUFFER_PTS(sinkbuf) + time_removed;
			GST_BUFFER_DURATION(sinkbuf) = GST_BUFFER_DURATION(sinkbuf) - time_removed;
		}
	}

	/* if buffer does not possess valid metadata or is zero length and we are not filling in discontinuities, push gap downstream */
	if(!(GST_BUFFER_PTS_IS_VALID(sinkbuf) && GST_BUFFER_DURATION_IS_VALID(sinkbuf) && GST_BUFFER_OFFSET_IS_VALID(sinkbuf) && GST_BUFFER_OFFSET_END_IS_VALID(sinkbuf)) || (!element->fill_discont && (GST_BUFFER_DURATION(sinkbuf) == 0 || GST_BUFFER_OFFSET(sinkbuf) == GST_BUFFER_OFFSET_END(sinkbuf)))) {
		GST_DEBUG_OBJECT(element, "pushing gap buffer at timestamp %lu seconds", (long unsigned) GST_TIME_AS_SECONDS(GST_BUFFER_PTS(sinkbuf)));
		GstBuffer *srcbuf;
		srcbuf = gst_buffer_copy(sinkbuf);
		GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_GAP);
		result = gst_pad_push(element->srcpad, srcbuf);
		if(G_UNLIKELY(result != GST_FLOW_OK))
			GST_WARNING_OBJECT(element, "push failed: %s", gst_flow_get_name(result));
		goto done;
	}

	/* if buffer is zero length and we are filling in discontinuities, fill it in, unless it has no valid timestamp. */
	if(element->fill_discont && (GST_BUFFER_DURATION(sinkbuf) == 0 || GST_BUFFER_OFFSET(sinkbuf) == GST_BUFFER_OFFSET_END(sinkbuf))) {
		if(GST_BUFFER_PTS_IS_VALID(sinkbuf) && GST_BUFFER_PTS(sinkbuf) > element->last_sinkbuf_ets) {
			switch(element->data_type) {
			case GSTLAL_INSERTGAP_U32:
				result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE);
				break;
			case GSTLAL_INSERTGAP_F32:
				result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE);
				break;
			case GSTLAL_INSERTGAP_F64:
				result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE);
				break;
			case GSTLAL_INSERTGAP_Z64:
				result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE);
				break;
			case GSTLAL_INSERTGAP_Z128:
				result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE);
				break;
			default:
				g_assert_not_reached();
			}
		} else {
			GST_DEBUG_OBJECT(element, "dropping zero length buffer at timestamp %lu seconds", (long unsigned) GST_TIME_AS_SECONDS(GST_BUFFER_PTS(sinkbuf)));
			gst_buffer_unref(sinkbuf);
		}
		goto done;
	}

	GstMapInfo inmap;
	gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ);

	/* We'll need these to decide gaps, offsets, and timestamps on the outgoing buffer(s) */
	gboolean sinkbuf_gap = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_GAP);
	gboolean sinkbuf_discont = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_DISCONT);
	guint64 sinkbuf_offset = GST_BUFFER_OFFSET(sinkbuf);
	guint64 sinkbuf_offset_end = GST_BUFFER_OFFSET_END(sinkbuf);
	GstClockTime sinkbuf_dur = GST_BUFFER_DURATION(sinkbuf);
	GstClockTime sinkbuf_pts = GST_BUFFER_PTS(sinkbuf);

	g_assert_cmpuint(inmap.size % element->unit_size, ==, 0);
	if(!element->chop_length || sinkbuf_pts > element->t0)
		g_assert_cmpuint((sinkbuf_offset_end - sinkbuf_offset), ==, inmap.size / element->unit_size); /* sanity checks */

	/* outdata will be filled with the data that goes on the outgoing buffer(s) */
	void *outdata;
	outdata = g_malloc((sinkbuf_offset_end - sinkbuf_offset) * element->unit_size);

	switch(element->data_type) {
	case GSTLAL_INSERTGAP_U32:
		result = process_inbuf_guint32((guint32 *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE);
		break;
	case GSTLAL_INSERTGAP_F32:
		result = process_inbuf_float((float *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE);
		break;
	case GSTLAL_INSERTGAP_F64:
		result = process_inbuf_double((double *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE);
		break;
	case GSTLAL_INSERTGAP_Z64:
		result = process_inbuf_floatcomplex((float complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE);
		break;
	case GSTLAL_INSERTGAP_Z128:
		result = process_inbuf_doublecomplex((double complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE);
		break;

	default:
		g_assert_not_reached();
	}

	g_free(outdata);
	outdata = NULL;
	gst_buffer_unmap(sinkbuf, &inmap);
	gst_buffer_unref(sinkbuf);
	/*
	 * done
	 */

done:
	return result;
}


/*
 * ============================================================================
 *
 *			  GObject Method Overrides
 *
 * ============================================================================
 */


/*
 * properties
 */


enum property {
	ARG_INSERT_GAP = 1,
	ARG_REMOVE_GAP,
	ARG_REMOVE_NAN,
	ARG_REMOVE_INF,
	ARG_FILL_DISCONT,
	ARG_REPLACE_VALUE,
	ARG_BAD_DATA_INTERVALS,
	ARG_BLOCK_DURATION,
	ARG_CHOP_LENGTH
};


static void set_property(GObject *object, enum property prop_id, const GValue *value, GParamSpec *pspec)
{
	GSTLALInsertGap *element = GSTLAL_INSERTGAP(object);

	GST_OBJECT_LOCK(element);
	switch (prop_id) {
	case ARG_INSERT_GAP:
		element->insert_gap = g_value_get_boolean(value);
		break;
	case ARG_REMOVE_GAP:
		element->remove_gap = g_value_get_boolean(value);
		break;
	case ARG_REMOVE_NAN:
		element->remove_nan = g_value_get_boolean(value);
		break;
	case ARG_REMOVE_INF:
		element->remove_inf = g_value_get_boolean(value);
		break;
	case ARG_FILL_DISCONT:
		element->fill_discont = g_value_get_boolean(value);
		break;
	case ARG_REPLACE_VALUE:
		element->replace_value = g_value_get_double(value);
		break;
	case ARG_BAD_DATA_INTERVALS: {
		if(element->bad_data_intervals) {
			g_free(element->bad_data_intervals);
			element->bad_data_intervals = NULL;
		}
		element->bad_data_intervals = gstlal_doubles_from_g_value_array(g_value_get_boxed(value), NULL, &element->array_length);
		if(element->array_length % 2)
			GST_ERROR_OBJECT(element, "Array length for property bad_data_intervals must be even");
		break;
	}
	case ARG_BLOCK_DURATION:
		element->block_duration = g_value_get_uint64(value);
		break;
	case ARG_CHOP_LENGTH:
		element->chop_length = g_value_get_uint64(value);
		break;
	default:
		G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
		break;
	}

	GST_OBJECT_UNLOCK(element);
}


static void get_property(GObject *object, enum property prop_id, GValue *value, GParamSpec *pspec)
{
	GSTLALInsertGap *element = GSTLAL_INSERTGAP(object);

	GST_OBJECT_LOCK(element);

	switch (prop_id) {
	case ARG_INSERT_GAP:
		g_value_set_boolean(value, element->insert_gap);
		break;
	case ARG_REMOVE_GAP:
		g_value_set_boolean(value, element->remove_gap);
		break;
	case ARG_REMOVE_NAN:
		g_value_set_boolean(value, element->remove_nan);
		break;
	case ARG_REMOVE_INF:
		g_value_set_boolean(value, element->remove_inf);
		break;
	case ARG_FILL_DISCONT:
		g_value_set_boolean(value, element->fill_discont);
		break;
	case ARG_REPLACE_VALUE:
		g_value_set_double(value, element->replace_value);
		break;
	case ARG_BAD_DATA_INTERVALS:
		g_value_take_boxed(value, gstlal_g_value_array_from_doubles(element->bad_data_intervals, element->array_length));
		break;
	case ARG_BLOCK_DURATION:
		g_value_set_uint64(value, element->block_duration);
		break;
	case ARG_CHOP_LENGTH:
		g_value_set_uint64(value, element->chop_length);
		break;
	default:
		G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
		break;
	}

	GST_OBJECT_UNLOCK(element);
}


/*
 * finalize()
 */


static void finalize(GObject *object)
{
	GSTLALInsertGap *element = GSTLAL_INSERTGAP(object);

	gst_object_unref(element->sinkpad);
	element->sinkpad = NULL;
	gst_object_unref(element->srcpad);
	element->srcpad = NULL;
	g_free(element->bad_data_intervals);
	element->bad_data_intervals = NULL;

	G_OBJECT_CLASS(gstlal_insertgap_parent_class)->finalize(object);
}


/*
 * class_init()
 */


static void gstlal_insertgap_class_init(GSTLALInsertGapClass *klass)
{
	GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
	GstElementClass *element_class = GST_ELEMENT_CLASS(klass);

	gst_element_class_set_details_simple(
		element_class,
		"Replace unwanted data with gaps",
		"Filter",
		"Replace unwanted data, specified with the property bad-data-intervals, with gaps.\n\t\t\t   "
		"Also can replace with another value, given by replace-value. Can also remove gaps\n\t\t\t   "
		"where data is acceptable, and fill in discontinuities if desired.",
		"Aaron Viets <aaron.viets@ligo.org>"
	);

	gobject_class->set_property = GST_DEBUG_FUNCPTR(set_property);
	gobject_class->get_property = GST_DEBUG_FUNCPTR(get_property);
	gobject_class->finalize = GST_DEBUG_FUNCPTR(finalize);

	gst_element_class_add_pad_template(element_class, gst_static_pad_template_get(&src_factory));
	gst_element_class_add_pad_template(element_class, gst_static_pad_template_get(&sink_factory));

	g_object_class_install_property(
		gobject_class,
		ARG_INSERT_GAP,
		g_param_spec_boolean(
			"insert-gap",
			"Insert gap",
			"If set to true (default), any data fitting the criteria specified by the property\n\t\t\t"
			"bad-data-intervals is replaced with gaps. Also, NaN's and inf's are replaced with\n\t\t\t"
			"gaps if the properties remove-nan and remove-inf are set to true, respectively.",
			TRUE,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_REMOVE_GAP,
		g_param_spec_boolean(
			"remove-gap",
			"Remove gap",
			"If set to true, any data in an input gap buffer that does not fit the criteria\n\t\t\t"
			"specified by the property bad-data-intervals will be marked as non-gap. If the\n\t\t\t"
			"property insert-gap is false and remove-gap is true, gaps with unacceptable\n\t\t\t"
			"data will be replaced by the value specified by the property replace-value.",
			FALSE,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_REMOVE_NAN,
		g_param_spec_boolean(
			"remove-nan",
			"Remove NaN",
			"If set to true (default), NaN's in the data stream will be replaced with gaps\n\t\t\t"
			"and/or the replace-value, as specified by user.",
			TRUE,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_REMOVE_INF,
		g_param_spec_boolean(
			"remove-inf",
			"Remove inf",
			"If set to true (default), infinities in the data stream will be replaced with\n\t\t\t"
			"gaps and/or the replace-value, as specified by user.",
			TRUE,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_FILL_DISCONT,
		g_param_spec_boolean(
			"fill-discont",
			"Fill discontinuity",
			"If set to true, discontinuities in the data stream will be filled with the\n\t\t\t"
			"replace-value (if set, otherwise 0), and gapped if insert-gap is true.",
			FALSE,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_REPLACE_VALUE,
		g_param_spec_double(
			"replace-value",
			"Replace value",
			"If set, this value is used to replace any data that fits the criteria\n\t\t\t"
			"specified by the property bad-data-intervals. If unset, values are not replaced.", 
			-G_MAXDOUBLE, G_MAXDOUBLE, G_MAXDOUBLE,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_BAD_DATA_INTERVALS,
		g_param_spec_value_array(
			"bad-data-intervals",
			"Bad data intervals",
			"Array containing minima and maxima of closed intervals in which data is\n\t\t\t"
			"considered unacceptable and will be replaced with gaps and/or the replace-value.\n\t\t\t"
			"Array indices 0, 2, 4, etc., represent maxima, and array indices 1, 3, 5, etc.,\n\t\t\t"
			"represent the corresponding minima.",
			g_param_spec_double(
				"coefficient",
				"Coefficient",
				"Coefficient",
				-G_MAXDOUBLE, G_MAXDOUBLE, 0.0,
				G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
			),
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_BLOCK_DURATION,
		g_param_spec_uint64(
			"block-duration",
			"Block duration",
			"Maximum output buffer duration in nanoseconds. Buffers may be smaller than this.\n\t\t\t"
			"Default is to not change buffer length except as required by added/removed gaps.",
			0, G_MAXUINT64, G_MAXUINT64 / 2,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);

	g_object_class_install_property(
		gobject_class,
		ARG_CHOP_LENGTH,
		g_param_spec_uint64(
			"chop-length",
			"Chop length",
			"Amount of initial data to throw away before producing output data, in nanoseconds.",
			0, G_MAXUINT64, 0,
			G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
		)
	);
}


/*
 * instance init
 */


static void gstlal_insertgap_init(GSTLALInsertGap *element)
{
	GstPad *pad;

	gst_element_create_all_pads(GST_ELEMENT(element));

	/* configure (and ref) sink pad */
	pad = gst_element_get_static_pad(GST_ELEMENT(element), "sink");
	gst_pad_set_event_function(pad, GST_DEBUG_FUNCPTR(sink_event));
	gst_pad_set_chain_function(pad, GST_DEBUG_FUNCPTR(chain));
	GST_PAD_SET_PROXY_CAPS(pad);
	GST_PAD_SET_PROXY_ALLOCATION(pad);
	GST_PAD_SET_PROXY_SCHEDULING(pad);
	element->sinkpad = pad;

	/* retrieve (and ref) src pad */
	pad = gst_element_get_static_pad(GST_ELEMENT(element), "src");
	GST_PAD_SET_PROXY_CAPS(pad);
	GST_PAD_SET_PROXY_ALLOCATION(pad);
	GST_PAD_SET_PROXY_SCHEDULING(pad);
	element->srcpad = pad;

	/* internal data */
	element->t0 = GST_CLOCK_TIME_NONE;
	element->bad_data_intervals = NULL;
	element->array_length = 0;
	element->rate = 0;
	element->channels = 0;
	element->unit_size = 0;
	element->last_sinkbuf_ets = 0;
	element->last_sinkbuf_offset_end = 0;
	element->discont_offset = 0;
	element->discont_time = 0;
	element->empty_bufs = 0;
}