Commit dceb31f5 authored by Aaron Viets's avatar Aaron Viets

lal_insertgap: Work in progress. Committing now because I can't access...

lal_insertgap:  Work in progress.  Committing now because I can't access shared memory on LLO for testing at the moment.
parent 2a899da9
Pipeline #73685 failed with stages
in 1 minute and 7 seconds
...@@ -149,6 +149,28 @@ G_DEFINE_TYPE_WITH_CODE( ...@@ -149,6 +149,28 @@ G_DEFINE_TYPE_WITH_CODE(
*/ */
/*
* set the metadata on an output buffer
*/
static void set_metadata(GSTLALInsertGap *element, GstBuffer *buf, guint64 outsamples, gboolean gap, gboolean discont) {
GST_BUFFER_OFFSET(buf) = element->next_out_offset;
element->next_out_offset += outsamples;
GST_BUFFER_OFFSET_END(buf) = element->next_out_offset;
GST_BUFFER_PTS(buf) = element->t0 + gst_util_uint64_scale_int_round(GST_BUFFER_OFFSET(buf) - element->offset0, GST_SECOND, element->rate);
GST_BUFFER_DURATION(buf) = element->t0 + gst_util_uint64_scale_int_round(GST_BUFFER_OFFSET_END(buf) - element->offset0, GST_SECOND, element->rate) - GST_BUFFER_TIMESTAMP(buf);
GST_BUFFER_FLAG_UNSET(buf, GST_BUFFER_FLAG_GAP);
if(discont)
GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_DISCONT);
if(gap)
GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_GAP);
else
GST_BUFFER_FLAG_UNSET(buf, GST_BUFFER_FLAG_GAP);
}
#define DEFINE_CHECK_DATA(DTYPE) \ #define DEFINE_CHECK_DATA(DTYPE) \
static gboolean check_data_ ## DTYPE(DTYPE *data, double *bad_data_intervals, int array_length, int num_checks, gboolean remove_nan, gboolean remove_inf) { \ static gboolean check_data_ ## DTYPE(DTYPE *data, double *bad_data_intervals, int array_length, int num_checks, gboolean remove_nan, gboolean remove_inf) { \
int i, j; \ int i, j; \
...@@ -174,7 +196,7 @@ DEFINE_CHECK_DATA(guint32); ...@@ -174,7 +196,7 @@ DEFINE_CHECK_DATA(guint32);
#define DEFINE_PROCESS_INBUF(DTYPE,COMPLEX) \ #define DEFINE_PROCESS_INBUF(DTYPE,COMPLEX) \
static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *indata, DTYPE COMPLEX *outdata, GSTLALInsertGap *element, gboolean sinkbuf_gap, gboolean sinkbuf_discont, guint64 sinkbuf_offset, guint64 sinkbuf_offset_end, GstClockTime sinkbuf_dur, GstClockTime sinkbuf_pts, gboolean complex_data) \ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *indata, DTYPE COMPLEX *outdata, GSTLALInsertGap *element, gboolean sinkbuf_gap, gboolean sinkbuf_discont, guint64 sinkbuf_offset, guint64 sinkbuf_offset_end, GstClockTime sinkbuf_dur, GstClockTime sinkbuf_pts, gboolean complex_data, gboolean empty_sinkbuf) \
{ \ { \
GstFlowReturn result = GST_FLOW_OK; \ GstFlowReturn result = GST_FLOW_OK; \
guint64 blocks, max_block_length; \ guint64 blocks, max_block_length; \
...@@ -182,14 +204,17 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind ...@@ -182,14 +204,17 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind
/* /*
* First, deal with discontinuity if necessary * First, deal with discontinuity if necessary
*/ \ */ \
if(element->fill_discont && (element->last_sinkbuf_offset_end != 0) && (sinkbuf_pts != element->last_sinkbuf_ets)) { \ if(element->fill_discont && (element->last_sinkbuf_ets != 0) && (sinkbuf_pts != element->last_sinkbuf_ets)) { \
\ \
g_print("Filling Discont!\n"); \
guint64 standard_blocks, last_block_length, buffer_num, sample_num, missing_samples = 0; \ guint64 standard_blocks, last_block_length, buffer_num, sample_num, missing_samples = 0; \
DTYPE COMPLEX sample_value; \ DTYPE COMPLEX sample_value; \
\ \
/* Track discont length and number of zero-length buffers */ \ /* Track discont length and number of zero-length buffers */ \
element->discont_time += (sinkbuf_pts - element->last_sinkbuf_ets); \ element->discont_time += (sinkbuf_pts - element->last_sinkbuf_ets); \
element->empty_bufs += (sinkbuf_dur ? 0 : 1); \ element->empty_bufs += (empty_sinkbuf ? 1 : 0); \
/* If there was no zero-length buffer, but the "duration" passed to this function was zero, then the timer sent this buffer. */ \
element->timeout_bufs += (empty_sinkbuf || sinkbuf_dur ? 0 : 1); \
\ \
/* Find number of missing samples and max block length in samples */ \ /* Find number of missing samples and max block length in samples */ \
missing_samples = gst_util_uint64_scale_int_round(sinkbuf_pts - element->last_sinkbuf_ets, element->rate, 1000000000); \ missing_samples = gst_util_uint64_scale_int_round(sinkbuf_pts - element->last_sinkbuf_ets, element->rate, 1000000000); \
...@@ -199,8 +224,8 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind ...@@ -199,8 +224,8 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind
g_assert_cmpuint(max_block_length, >, 0); \ g_assert_cmpuint(max_block_length, >, 0); \
\ \
/* Message for debugging */ \ /* Message for debugging */ \
if(sinkbuf_dur && sinkbuf_offset != sinkbuf_offset_end) \ if(sinkbuf_dur && (sinkbuf_offset != sinkbuf_offset_end || element->empty_bufs || element->timeout_bufs)) \
GST_WARNING_OBJECT(element, "filling discontinuity lasting %f seconds (%lu samples) including %lu zero-length buffers and starting at %f seconds (offset %lu)", (((double) element->discont_time) / 1000000000.0), gst_util_uint64_scale_int_round(element->discont_time, element->rate, 1000000000), element->empty_bufs, (double) sinkbuf_pts / 1000000000.0 - (double) element->discont_time / 1000000000.0, sinkbuf_offset); \ GST_WARNING_OBJECT(element, "filling discontinuity lasting %f seconds (%lu samples) including %lu zero-length buffers and %lu timeout buffers, starting at %f seconds (offset %lu)", (((double) element->discont_time) / 1000000000.0), gst_util_uint64_scale_int_round(element->discont_time, element->rate, 1000000000), element->empty_bufs, element->timeout_bufs, (double) sinkbuf_pts / 1000000000.0 - (double) element->discont_time / 1000000000.0, sinkbuf_offset); \
\ \
standard_blocks = missing_samples / max_block_length; \ standard_blocks = missing_samples / max_block_length; \
last_block_length = missing_samples % max_block_length; \ last_block_length = missing_samples % max_block_length; \
...@@ -226,16 +251,8 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind ...@@ -226,16 +251,8 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind
goto done; \ goto done; \
} \ } \
\ \
/* set flags, caps, offset, and timestamps. */ \ /* set flags, offsets, and timestamps. */ \
GST_BUFFER_OFFSET(discont_buf) = element->last_sinkbuf_offset_end + element->discont_offset + buffer_num * max_block_length; \ set_metadata(element, discont_buf, max_block_length, element->insert_gap, FALSE); \
GST_BUFFER_OFFSET_END(discont_buf) = GST_BUFFER_OFFSET(discont_buf) + max_block_length; \
GST_BUFFER_PTS(discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, (guint64) buffer_num * max_block_length, missing_samples); \
GST_BUFFER_DURATION(discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, ((guint64) buffer_num + 1) * max_block_length, missing_samples) - GST_BUFFER_PTS(discont_buf); \
GST_BUFFER_FLAG_UNSET(discont_buf, GST_BUFFER_FLAG_DISCONT); \
if(element->insert_gap) \
GST_BUFFER_FLAG_SET(discont_buf, GST_BUFFER_FLAG_GAP); \
else \
GST_BUFFER_FLAG_UNSET(discont_buf, GST_BUFFER_FLAG_GAP); \
\ \
/* push buffer downstream */ \ /* push buffer downstream */ \
GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(discont_buf)); \ GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(discont_buf)); \
...@@ -264,16 +281,8 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind ...@@ -264,16 +281,8 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind
goto done; \ goto done; \
} \ } \
\ \
/* set flags, caps, offset, and timestamps. */ \ /* set flags, offsets, and timestamps. */ \
GST_BUFFER_OFFSET(last_discont_buf) = element->last_sinkbuf_offset_end + element->discont_offset + missing_samples - last_block_length; \ set_metadata(element, last_discont_buf, last_block_length, element->insert_gap, FALSE); \
GST_BUFFER_OFFSET_END(last_discont_buf) = GST_BUFFER_OFFSET(last_discont_buf) + last_block_length; \
GST_BUFFER_PTS(last_discont_buf) = element->last_sinkbuf_ets + gst_util_uint64_scale_round(sinkbuf_pts - element->last_sinkbuf_ets, missing_samples - last_block_length, missing_samples); \
GST_BUFFER_DURATION(last_discont_buf) = sinkbuf_pts - GST_BUFFER_PTS(last_discont_buf); \
GST_BUFFER_FLAG_UNSET(last_discont_buf, GST_BUFFER_FLAG_DISCONT); \
if(element->insert_gap) \
GST_BUFFER_FLAG_SET(last_discont_buf, GST_BUFFER_FLAG_GAP); \
else \
GST_BUFFER_FLAG_UNSET(last_discont_buf, GST_BUFFER_FLAG_GAP); \
\ \
/* push buffer downstream */ \ /* push buffer downstream */ \
GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(last_discont_buf)); \ GST_DEBUG_OBJECT(element, "pushing sub-buffer %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(last_discont_buf)); \
...@@ -283,13 +292,13 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind ...@@ -283,13 +292,13 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind
goto done; \ goto done; \
} \ } \
} \ } \
element->discont_offset += missing_samples; \
} \ } \
if(!sinkbuf_dur) \ if(!sinkbuf_dur) \
goto done; \ goto done; \
\ \
element->discont_time = 0; \ element->discont_time = 0; \
element->empty_bufs = 0; \ element->empty_bufs = 0; \
element->timeout_bufs = 0; \
\ \
/* /*
* Now, use data on input buffer to make next output buffer(s) * Now, use data on input buffer to make next output buffer(s)
...@@ -352,25 +361,15 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind ...@@ -352,25 +361,15 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind
result = GST_FLOW_ERROR; \ result = GST_FLOW_ERROR; \
goto done; \ goto done; \
} \ } \
\
/* set flags, caps, offset, and timestamps. */ \
GST_BUFFER_OFFSET(srcbuf) = sinkbuf_offset + element->discont_offset + offset + 1 - current_srcbuf_length; \
GST_BUFFER_OFFSET_END(srcbuf) = GST_BUFFER_OFFSET(srcbuf) + current_srcbuf_length; \
GST_BUFFER_PTS(srcbuf) = sinkbuf_pts + gst_util_uint64_scale_int_round(sinkbuf_dur, offset + 1 - current_srcbuf_length, length); \
GST_BUFFER_DURATION(srcbuf) = sinkbuf_pts + gst_util_uint64_scale_int_round(sinkbuf_dur, offset + 1, length) - GST_BUFFER_PTS(srcbuf); \
if(srcbuf_gap) \
GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_GAP); \
else \
GST_BUFFER_FLAG_UNSET(srcbuf, GST_BUFFER_FLAG_GAP); \
\ \
/* /*
* only the first subbuffer of a buffer flagged as a * only the first subbuffer of a buffer flagged as a
* discontinuity is a discontinuity. * discontinuity is a discontinuity.
*/ \ */ \
if(sinkbuf_discont && (offset + 1 - current_srcbuf_length == 0) && ((!(element->fill_discont)) || (element->last_sinkbuf_ets == 0))) \ gboolean need_discont = sinkbuf_discont && (offset + 1 - current_srcbuf_length == 0) && ((!(element->fill_discont)) || (element->last_sinkbuf_ets == 0)); \
GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_DISCONT); \ /* set flags, offsets, and timestamps. */ \
else \ set_metadata(element, srcbuf, current_srcbuf_length, srcbuf_gap, need_discont); \
GST_BUFFER_FLAG_UNSET(srcbuf, GST_BUFFER_FLAG_DISCONT); \ \
if(srcbuf_gap_next != srcbuf_gap) { \ if(srcbuf_gap_next != srcbuf_gap) { \
/* We need to reset our place in the input buffer */ \ /* We need to reset our place in the input buffer */ \
offset++; \ offset++; \
...@@ -392,7 +391,6 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind ...@@ -392,7 +391,6 @@ static GstFlowReturn process_inbuf_ ## DTYPE ## COMPLEX(const DTYPE COMPLEX *ind
} \ } \
done: \ done: \
element->last_sinkbuf_ets = sinkbuf_pts + sinkbuf_dur; \ element->last_sinkbuf_ets = sinkbuf_pts + sinkbuf_dur; \
element->last_sinkbuf_offset_end = sinkbuf_offset_end ? sinkbuf_offset_end : element->last_sinkbuf_offset_end; \
return result; \ return result; \
} }
...@@ -421,7 +419,10 @@ static void *input_buffer_timer(void *void_element) { ...@@ -421,7 +419,10 @@ static void *input_buffer_timer(void *void_element) {
*/ */
/* don't hog a billion CPUs */ /* don't hog a billion CPUs */
sleep(1); if(element->block_duration < G_MAXUINT64 / 2)
sleep(element->block_duration / 1000000000.0);
else
sleep(1);
/* Get the current real time as a string */ /* Get the current real time as a string */
GstDateTime *current_gst_time = gst_date_time_new_now_utc(); GstDateTime *current_gst_time = gst_date_time_new_now_utc();
...@@ -431,28 +432,40 @@ static void *input_buffer_timer(void *void_element) { ...@@ -431,28 +432,40 @@ static void *input_buffer_timer(void *void_element) {
struct tm tm; struct tm tm;
strptime(current_utc_time, "%Y-%m-%dT%H:%M:%SZ", &tm); strptime(current_utc_time, "%Y-%m-%dT%H:%M:%SZ", &tm);
/* time in nanoseconds */ /* Time in nanoseconds */
guint64 current_time = (guint64) XLALUTCToGPS(&tm) * 1000000000 + (guint64) gst_date_time_get_microsecond(current_gst_time) * 1000; guint64 current_time = (guint64) XLALUTCToGPS(&tm) * 1000000000 + (guint64) gst_date_time_get_microsecond(current_gst_time) * 1000;
/*
* The minimum allowable current buffer timestamp. If the input buffer
* has an earlier timestamp, we will push output buffers without waiting
* any longer for input.
*/
guint64 ets_min = current_time - element->wait_time;
/* It needs to be a multiple of the sample period. */
ets_min = gst_util_uint64_scale_int_round(gst_util_uint64_scale_int(ets_min, element->rate, 1000000000), 1000000000, element->rate);
if(element->last_sinkbuf_ets + element->wait_time < current_time) { g_mutex_lock(&element->mutex);
/* Then we need to push a buffer */ if((double) ets_min - (double) element->last_sinkbuf_ets + 1000000000.0 - (double) (rand() % 2000000000) > 1000000000.0 / element->rate) {
g_mutex_lock(&element->mutex); g_print("input_buffer_timer going to push a buffer: ets_min=%lu, last_sinkbuf_ets=%lu\n", ets_min, element->last_sinkbuf_ets);
/*
* Then we need to push a buffer. Note the requirement that the
* time is late enough for us to produce at least one full sample.
*/
GstFlowReturn result; GstFlowReturn result;
switch(element->data_type) { switch(element->data_type) {
case GSTLAL_INSERTGAP_U32: case GSTLAL_INSERTGAP_U32:
result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE); result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, ets_min, FALSE, FALSE);
break; break;
case GSTLAL_INSERTGAP_F32: case GSTLAL_INSERTGAP_F32:
result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE); result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, ets_min, FALSE, FALSE);
break; break;
case GSTLAL_INSERTGAP_F64: case GSTLAL_INSERTGAP_F64:
result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE); result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, ets_min, FALSE, FALSE);
break; break;
case GSTLAL_INSERTGAP_Z64: case GSTLAL_INSERTGAP_Z64:
result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, TRUE); result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, ets_min, TRUE, FALSE);
break; break;
case GSTLAL_INSERTGAP_Z128: case GSTLAL_INSERTGAP_Z128:
result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, TRUE); result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, ets_min, TRUE, FALSE);
break; break;
default: default:
g_assert_not_reached(); g_assert_not_reached();
...@@ -461,11 +474,14 @@ static void *input_buffer_timer(void *void_element) { ...@@ -461,11 +474,14 @@ static void *input_buffer_timer(void *void_element) {
if(G_UNLIKELY(result != GST_FLOW_OK)) if(G_UNLIKELY(result != GST_FLOW_OK))
GST_WARNING_OBJECT(element, "push failed in function input_buffer_timer(): %s", gst_flow_get_name(result)); GST_WARNING_OBJECT(element, "push failed in function input_buffer_timer(): %s", gst_flow_get_name(result));
g_mutex_unlock(&element->mutex); g_print("input_buffer_timer pushed a buffer\n");
} }
g_mutex_unlock(&element->mutex);
if(element->finished_running) if(element->finished_running) {
g_print("input_buffer_timer pthread_exit()\n");
pthread_exit(NULL); pthread_exit(NULL);
}
} }
} }
...@@ -552,6 +568,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) ...@@ -552,6 +568,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
if(GST_BUFFER_PTS_IS_VALID(sinkbuf)) { if(GST_BUFFER_PTS_IS_VALID(sinkbuf)) {
/* Set the timestamp of the first output sample) */ /* Set the timestamp of the first output sample) */
if(element->t0 == GST_CLOCK_TIME_NONE) { if(element->t0 == GST_CLOCK_TIME_NONE) {
element->offset0 = element->next_out_offset = GST_BUFFER_OFFSET(sinkbuf) + gst_util_uint64_scale_int_round(element->chop_length, element->rate, GST_SECOND);
element->t0 = GST_BUFFER_PTS(sinkbuf) + element->chop_length; element->t0 = GST_BUFFER_PTS(sinkbuf) + element->chop_length;
if(element->wait_time > 0) { if(element->wait_time > 0) {
/* /*
...@@ -604,19 +621,19 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) ...@@ -604,19 +621,19 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
g_mutex_lock(&element->mutex); g_mutex_lock(&element->mutex);
switch(element->data_type) { switch(element->data_type) {
case GSTLAL_INSERTGAP_U32: case GSTLAL_INSERTGAP_U32:
result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE); result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE, TRUE);
break; break;
case GSTLAL_INSERTGAP_F32: case GSTLAL_INSERTGAP_F32:
result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE); result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE, TRUE);
break; break;
case GSTLAL_INSERTGAP_F64: case GSTLAL_INSERTGAP_F64:
result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE); result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE, TRUE);
break; break;
case GSTLAL_INSERTGAP_Z64: case GSTLAL_INSERTGAP_Z64:
result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE); result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE, TRUE);
break; break;
case GSTLAL_INSERTGAP_Z128: case GSTLAL_INSERTGAP_Z128:
result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE); result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), TRUE, TRUE);
break; break;
default: default:
g_assert_not_reached(); g_assert_not_reached();
...@@ -633,13 +650,22 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) ...@@ -633,13 +650,22 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
/* We'll need these to decide gaps, offsets, and timestamps on the outgoing buffer(s) */ /* We'll need these to decide gaps, offsets, and timestamps on the outgoing buffer(s) */
gboolean sinkbuf_gap = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_GAP); gboolean sinkbuf_gap = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_GAP);
gboolean sinkbuf_discont = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_DISCONT); gboolean sinkbuf_discont = G_UNLIKELY(GST_BUFFER_IS_DISCONT(sinkbuf) || GST_BUFFER_OFFSET(sinkbuf) != element->next_out_offset || !GST_CLOCK_TIME_IS_VALID(element->t0));
guint64 sinkbuf_offset = GST_BUFFER_OFFSET(sinkbuf); guint64 sinkbuf_offset = GST_BUFFER_OFFSET(sinkbuf);
guint64 sinkbuf_offset_end = GST_BUFFER_OFFSET_END(sinkbuf); guint64 sinkbuf_offset_end = GST_BUFFER_OFFSET_END(sinkbuf);
GstClockTime sinkbuf_dur = GST_BUFFER_DURATION(sinkbuf); GstClockTime sinkbuf_dur = GST_BUFFER_DURATION(sinkbuf);
GstClockTime sinkbuf_pts = GST_BUFFER_PTS(sinkbuf); GstClockTime sinkbuf_pts = GST_BUFFER_PTS(sinkbuf);
/* If we're not filling discontinuities, reset the timestamp and offset bookkeeping. */
if(!element->fill_discont && element->t0 != GST_CLOCK_TIME_NONE) {
element->offset0 = element->next_out_offset = GST_BUFFER_OFFSET(sinkbuf);
element->t0 = GST_BUFFER_PTS(sinkbuf);
}
if(sinkbuf_pts >= element->last_sinkbuf_ets) { if(sinkbuf_pts >= element->last_sinkbuf_ets) {
g_print("chain pushing buffer\n");
if(sinkbuf_pts == element->last_sinkbuf_ets)
g_print("chain pushing buffer no discont\n");
GstMapInfo inmap; GstMapInfo inmap;
gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ); gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ);
...@@ -653,19 +679,19 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) ...@@ -653,19 +679,19 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
switch(element->data_type) { switch(element->data_type) {
case GSTLAL_INSERTGAP_U32: case GSTLAL_INSERTGAP_U32:
result = process_inbuf_guint32((guint32 *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); result = process_inbuf_guint32((guint32 *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE, FALSE);
break; break;
case GSTLAL_INSERTGAP_F32: case GSTLAL_INSERTGAP_F32:
result = process_inbuf_float((float *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); result = process_inbuf_float((float *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE, FALSE);
break; break;
case GSTLAL_INSERTGAP_F64: case GSTLAL_INSERTGAP_F64:
result = process_inbuf_double((double *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); result = process_inbuf_double((double *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE, FALSE);
break; break;
case GSTLAL_INSERTGAP_Z64: case GSTLAL_INSERTGAP_Z64:
result = process_inbuf_floatcomplex((float complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); result = process_inbuf_floatcomplex((float complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE, FALSE);
break; break;
case GSTLAL_INSERTGAP_Z128: case GSTLAL_INSERTGAP_Z128:
result = process_inbuf_doublecomplex((double complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); result = process_inbuf_doublecomplex((double complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE, FALSE);
break; break;
default: default:
...@@ -675,6 +701,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) ...@@ -675,6 +701,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
g_free(outdata); g_free(outdata);
outdata = NULL; outdata = NULL;
gst_buffer_unmap(sinkbuf, &inmap); gst_buffer_unmap(sinkbuf, &inmap);
g_print("chain done pushing buffer\n");
} }
gst_buffer_unref(sinkbuf); gst_buffer_unref(sinkbuf);
...@@ -1056,18 +1083,19 @@ static void gstlal_insertgap_init(GSTLALInsertGap *element) ...@@ -1056,18 +1083,19 @@ static void gstlal_insertgap_init(GSTLALInsertGap *element)
element->srcpad = pad; element->srcpad = pad;
/* internal data */ /* internal data */
g_mutex_init(&element->mutex);
element->t0 = GST_CLOCK_TIME_NONE; element->t0 = GST_CLOCK_TIME_NONE;
element->offset0 = GST_BUFFER_OFFSET_NONE;
element->next_out_offset = GST_BUFFER_OFFSET_NONE;
g_mutex_init(&element->mutex);
element->bad_data_intervals = NULL; element->bad_data_intervals = NULL;
element->array_length = 0; element->array_length = 0;
element->rate = 0; element->rate = 0;
element->channels = 0; element->channels = 0;
element->unit_size = 0; element->unit_size = 0;
element->last_sinkbuf_ets = 0; element->last_sinkbuf_ets = 0;
element->last_sinkbuf_offset_end = 0;
element->discont_offset = 0;
element->discont_time = 0; element->discont_time = 0;
element->empty_bufs = 0; element->empty_bufs = 0;
element->timeout_bufs = 0;
/* Needed for thread used to time input buffers */ /* Needed for thread used to time input buffers */
element->finished_running = FALSE; element->finished_running = FALSE;
......
...@@ -75,16 +75,17 @@ struct _GSTLALInsertGap { ...@@ -75,16 +75,17 @@ struct _GSTLALInsertGap {
GSTLAL_INSERTGAP_Z128 GSTLAL_INSERTGAP_Z128
} data_type; } data_type;
guint64 last_sinkbuf_ets;
guint64 last_sinkbuf_offset_end;
guint64 discont_offset;
guint64 discont_time;
guint64 empty_bufs; guint64 empty_bufs;
guint64 timeout_bufs;
GMutex mutex; GMutex mutex;
gboolean finished_running; gboolean finished_running;
/* timestamp bookkeeping */ /* timestamp bookkeeping */
GstClockTime t0; GstClockTime t0;
guint64 offset0;
guint64 next_out_offset;
guint64 last_sinkbuf_ets;
guint64 discont_time;
/* properties */ /* properties */
gboolean insert_gap; gboolean insert_gap;
......
...@@ -1105,6 +1105,11 @@ static void gstlal_typecast_class_init(GSTLALTypeCastClass *klass) { ...@@ -1105,6 +1105,11 @@ static void gstlal_typecast_class_init(GSTLALTypeCastClass *klass) {
static void gstlal_typecast_init(GSTLALTypeCast *element) { static void gstlal_typecast_init(GSTLALTypeCast *element) {
element->t0 = GST_CLOCK_TIME_NONE;
element->offset0 = GST_BUFFER_OFFSET_NONE;
element->next_in_offset = GST_BUFFER_OFFSET_NONE;
element->next_out_offset = GST_BUFFER_OFFSET_NONE;
element->need_discont = TRUE;
element->unit_size_in = 0; element->unit_size_in = 0;
element->unit_size_out = 0; element->unit_size_out = 0;
element->channels = 0; element->channels = 0;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment