Commit 7a734894 authored by Aaron Viets's avatar Aaron Viets
Browse files

lal_insertgap: Added option to time input buffers relative to real time in...

lal_insertgap:  Added option to time input buffers relative to real time in order to avoid falling behind.
parent e7702021
Pipeline #73445 failed with stages
in 2 minutes and 55 seconds
......@@ -58,9 +58,13 @@
*/
#define _XOPEN_SOURCE
#include <math.h>
#include <string.h>
#include <complex.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
/*
......@@ -82,6 +86,7 @@
#include <gstlal/gstlal_debug.h>
#include <gstlal/gstlal_audio_info.h>
#include <gstlal_insertgap.h>
#include <lal/Date.h>
/*
......@@ -399,6 +404,72 @@ DEFINE_PROCESS_INBUF(float,complex)
DEFINE_PROCESS_INBUF(double,complex)
static void *input_buffer_timer(void *void_element) {
/* Give the element a few seconds to get started before barfing out fake data */
sleep(5);
GSTLALInsertGap *element = GSTLAL_INSERTGAP(void_element);
while(TRUE) {
/*
* When element->wait_time > 0, this function runs continuously on a
* separate thread, checking the timestamps of input buffers,
* comparing them against real time. If the latest timestamp in the
* input data is more than the wait time behind real time, we will
* push buffers from the source pad.
*/
/* don't hog a billion CPUs */
sleep(1);
/* Get the current real time as a string */
GstDateTime *current_gst_time = gst_date_time_new_now_utc();
gchar *current_utc_time = gst_date_time_to_iso8601_string(current_gst_time);
/* parse DateTime to gps time */
struct tm tm;
strptime(current_utc_time, "%Y-%m-%dT%H:%M:%SZ", &tm);
/* time in nanoseconds */
guint64 current_time = (guint64) XLALUTCToGPS(&tm) * 1000000000 + (guint64) gst_date_time_get_microsecond(current_gst_time) * 1000;
if(element->last_sinkbuf_ets + element->wait_time < current_time) {
/* Then we need to push a buffer */
g_mutex_lock(&element->mutex);
GstFlowReturn result;
switch(element->data_type) {
case GSTLAL_INSERTGAP_U32:
result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE);
break;
case GSTLAL_INSERTGAP_F32:
result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE);
break;
case GSTLAL_INSERTGAP_F64:
result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE);
break;
case GSTLAL_INSERTGAP_Z64:
result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, TRUE);
break;
case GSTLAL_INSERTGAP_Z128:
result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, TRUE);
break;
default:
g_assert_not_reached();
}
if(G_UNLIKELY(result != GST_FLOW_OK))
GST_WARNING_OBJECT(element, "push failed in function input_buffer_timer(): %s", gst_flow_get_name(result));
g_mutex_unlock(&element->mutex);
}
if(element->finished_running)
pthread_exit(NULL);
}
}
/*
* ============================================================================
*
......@@ -480,8 +551,19 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
if(GST_BUFFER_PTS_IS_VALID(sinkbuf)) {
/* Set the timestamp of the first output sample) */
if(element->t0 == GST_CLOCK_TIME_NONE)
if(element->t0 == GST_CLOCK_TIME_NONE) {
element->t0 = GST_BUFFER_PTS(sinkbuf) + element->chop_length;
if(element->wait_time > 0) {
/*
* Start monitoring the arrival of new buffers.
* Fill in data if they are late.
*/
pthread_t *thread = g_malloc(sizeof(pthread_t));
int failure = pthread_create(thread, NULL, input_buffer_timer, (void *) element);
if (failure)
GST_ERROR_OBJECT(element, "Return value of pthread_create() is %d", failure);
}
}
/* If we are throwing away any initial data, do it now, and send a zero-length buffer downstream to let other elements know when to expect the first buffer */
if(element->chop_length && GST_BUFFER_PTS(sinkbuf) + GST_BUFFER_DURATION(sinkbuf) <= element->t0) {
......@@ -519,6 +601,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
/* if buffer is zero length and we are filling in discontinuities, fill it in, unless it has no valid timestamp. */
if(element->fill_discont && (GST_BUFFER_DURATION(sinkbuf) == 0 || GST_BUFFER_OFFSET(sinkbuf) == GST_BUFFER_OFFSET_END(sinkbuf))) {
if(GST_BUFFER_PTS_IS_VALID(sinkbuf) && GST_BUFFER_PTS(sinkbuf) > element->last_sinkbuf_ets) {
g_mutex_lock(&element->mutex);
switch(element->data_type) {
case GSTLAL_INSERTGAP_U32:
result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE);
......@@ -538,6 +621,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
default:
g_assert_not_reached();
}
g_mutex_unlock(&element->mutex);
} else {
GST_DEBUG_OBJECT(element, "dropping zero length buffer at timestamp %lu seconds", (long unsigned) GST_TIME_AS_SECONDS(GST_BUFFER_PTS(sinkbuf)));
gst_buffer_unref(sinkbuf);
......@@ -545,8 +629,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
goto done;
}
GstMapInfo inmap;
gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ);
g_mutex_lock(&element->mutex);
/* We'll need these to decide gaps, offsets, and timestamps on the outgoing buffer(s) */
gboolean sinkbuf_gap = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_GAP);
......@@ -556,6 +639,10 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
GstClockTime sinkbuf_dur = GST_BUFFER_DURATION(sinkbuf);
GstClockTime sinkbuf_pts = GST_BUFFER_PTS(sinkbuf);
if(sinkbuf_pts >= element->last_sinkbuf_ets) {
GstMapInfo inmap;
gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ);
g_assert_cmpuint(inmap.size % element->unit_size, ==, 0);
if(!element->chop_length || sinkbuf_pts > element->t0)
g_assert_cmpuint((sinkbuf_offset_end - sinkbuf_offset), ==, inmap.size / element->unit_size); /* sanity checks */
......@@ -588,7 +675,11 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf)
g_free(outdata);
outdata = NULL;
gst_buffer_unmap(sinkbuf, &inmap);
}
gst_buffer_unref(sinkbuf);
g_mutex_unlock(&element->mutex);
/*
* done
*/
......@@ -621,7 +712,8 @@ enum property {
ARG_REPLACE_VALUE,
ARG_BAD_DATA_INTERVALS,
ARG_BLOCK_DURATION,
ARG_CHOP_LENGTH
ARG_CHOP_LENGTH,
ARG_WAIT_TIME
};
......@@ -671,6 +763,9 @@ static void set_property(GObject *object, enum property prop_id, const GValue *v
case ARG_CHOP_LENGTH:
element->chop_length = g_value_get_uint64(value);
break;
case ARG_WAIT_TIME:
element->wait_time = g_value_get_uint64(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
......@@ -726,6 +821,9 @@ static void get_property(GObject *object, enum property prop_id, GValue *value,
case ARG_CHOP_LENGTH:
g_value_set_uint64(value, element->chop_length);
break;
case ARG_WAIT_TIME:
g_value_set_uint64(value, element->wait_time);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
......@@ -751,6 +849,10 @@ static void finalize(GObject *object)
g_free(element->bad_data_intervals);
element->bad_data_intervals = NULL;
/* Needed for thread used to time input buffers */
element->finished_running = TRUE;
g_mutex_clear(&element->mutex);
G_OBJECT_CLASS(gstlal_insertgap_parent_class)->finalize(object);
}
......@@ -908,6 +1010,21 @@ static void gstlal_insertgap_class_init(GSTLALInsertGapClass *klass)
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
)
);
g_object_class_install_property(
gobject_class,
ARG_WAIT_TIME,
g_param_spec_uint64(
"wait-time",
"Wait Time",
"If nonzero, timestamps of input buffers are tracked and compared to real time.\n\t\t\t"
"If the latest input timestamp is more than the wait-time (in nanoseconds) behind\n\t\t\t"
"real time, output buffers are filled with the replace-value and pushed\n\t\t\t"
"downstream. Default is to disable.",
0, G_MAXUINT64, 0,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
)
);
}
......@@ -939,6 +1056,7 @@ static void gstlal_insertgap_init(GSTLALInsertGap *element)
element->srcpad = pad;
/* internal data */
g_mutex_init(&element->mutex);
element->t0 = GST_CLOCK_TIME_NONE;
element->bad_data_intervals = NULL;
element->array_length = 0;
......@@ -950,4 +1068,7 @@ static void gstlal_insertgap_init(GSTLALInsertGap *element)
element->discont_offset = 0;
element->discont_time = 0;
element->empty_bufs = 0;
/* Needed for thread used to time input buffers */
element->finished_running = FALSE;
}
......@@ -60,12 +60,10 @@ struct _GSTLALInsertGap {
GstElement element;
/* pads */
GstPad *sinkpad;
GstPad *srcpad;
/* stream parameters */
gint rate;
gint channels;
gint unit_size;
......@@ -82,13 +80,13 @@ struct _GSTLALInsertGap {
guint64 discont_offset;
guint64 discont_time;
guint64 empty_bufs;
GMutex mutex;
gboolean finished_running;
/* timestamp bookkeeping */
GstClockTime t0;
/* properties */
gboolean insert_gap;
gboolean remove_gap;
gboolean remove_nan;
......@@ -99,6 +97,7 @@ struct _GSTLALInsertGap {
gint array_length;
guint64 chop_length;
GstClockTime block_duration;
guint64 wait_time;
};
......
......@@ -1132,7 +1132,7 @@ def compute_exact_kappas_from_filters_file(pipeline, X, freqs, EPICS, rate):
print("exact kappas 56")
R0 = pipeparts.mktee(pipeline, mkcapsfiltersetter(pipeline, pipeparts.mkgeneric(pipeline, mkadder(pipeline, [mkpow(pipeline, pipeparts.mkcapsfilter(pipeline, b, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), exponent = 3.0), complex_audioamplify(pipeline, mkmultiplier(pipeline, [pipeparts.mkcapsfilter(pipeline, d, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), mkpow(pipeline, pipeparts.mkcapsfilter(pipeline, a, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), exponent = 2.0)]), 8.0, 0.0), complex_audioamplify(pipeline, mkmultiplier(pipeline, [pipeparts.mkcapsfilter(pipeline, a, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), pipeparts.mkcapsfilter(pipeline, b, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), pipeparts.mkcapsfilter(pipeline, c, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate)]), -4.0, 0.0)]), "creal"), "audio/x-raw,format=F64LE,channel-mask=(bitmask)0x0,channels=1"))
print("exact kappas 57")
R0sign = pipeparts.mktee(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkmatrixmixer(pipeline, mkmultiplier(pipeline, [pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "creal"), mkpow(pipeline, pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "cabs"), exponent = -1.0)]), matrix = [[1.0, 0.0]])))
R0sign = pipeparts.mktee(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkmatrixmixer(pipeline, mkmultiplier(pipeline, [pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=F64LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "creal"), mkpow(pipeline, pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=F64LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "cabs"), exponent = -1.0)]), matrix = [[1.0, 0.0]])))
print("exact kappas 60")
# Finally, compute kappa_C
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment