diff --git a/gstlal-calibration/gst/lal/gstlal_insertgap.c b/gstlal-calibration/gst/lal/gstlal_insertgap.c index ef719bb8546835844dd40c494f9cf85c3f971abb..614e9f4bb6e761a67487d7d8ffb12efbbb8d62a7 100644 --- a/gstlal-calibration/gst/lal/gstlal_insertgap.c +++ b/gstlal-calibration/gst/lal/gstlal_insertgap.c @@ -58,9 +58,13 @@ */ +#define _XOPEN_SOURCE #include <math.h> #include <string.h> #include <complex.h> +#include <time.h> +#include <pthread.h> +#include <unistd.h> /* @@ -82,6 +86,7 @@ #include <gstlal/gstlal_debug.h> #include <gstlal/gstlal_audio_info.h> #include <gstlal_insertgap.h> +#include <lal/Date.h> /* @@ -399,6 +404,72 @@ DEFINE_PROCESS_INBUF(float,complex) DEFINE_PROCESS_INBUF(double,complex) +static void *input_buffer_timer(void *void_element) { + + /* Give the element a few seconds to get started before barfing out fake data */ + sleep(5); + + GSTLALInsertGap *element = GSTLAL_INSERTGAP(void_element); + + while(TRUE) { + /* + * When element->wait_time > 0, this function runs continuously on a + * separate thread, checking the timestamps of input buffers, + * comparing them against real time. If the latest timestamp in the + * input data is more than the wait time behind real time, we will + * push buffers from the source pad. + */ + + /* don't hog a billion CPUs */ + sleep(1); + + /* Get the current real time as a string */ + GstDateTime *current_gst_time = gst_date_time_new_now_utc(); + gchar *current_utc_time = gst_date_time_to_iso8601_string(current_gst_time); + + /* parse DateTime to gps time */ + struct tm tm; + strptime(current_utc_time, "%Y-%m-%dT%H:%M:%SZ", &tm); + + /* time in nanoseconds */ + guint64 current_time = (guint64) XLALUTCToGPS(&tm) * 1000000000 + (guint64) gst_date_time_get_microsecond(current_gst_time) * 1000; + + if(element->last_sinkbuf_ets + element->wait_time < current_time) { + /* Then we need to push a buffer */ + g_mutex_lock(&element->mutex); + GstFlowReturn result; + switch(element->data_type) { + case GSTLAL_INSERTGAP_U32: + result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE); + break; + case GSTLAL_INSERTGAP_F32: + result = process_inbuf_float(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE); + break; + case GSTLAL_INSERTGAP_F64: + result = process_inbuf_double(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, FALSE); + break; + case GSTLAL_INSERTGAP_Z64: + result = process_inbuf_floatcomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, TRUE); + break; + case GSTLAL_INSERTGAP_Z128: + result = process_inbuf_doublecomplex(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, current_time - element->wait_time, TRUE); + break; + default: + g_assert_not_reached(); + } + + if(G_UNLIKELY(result != GST_FLOW_OK)) + GST_WARNING_OBJECT(element, "push failed in function input_buffer_timer(): %s", gst_flow_get_name(result)); + + g_mutex_unlock(&element->mutex); + } + + if(element->finished_running) + pthread_exit(NULL); + } +} + + /* * ============================================================================ * @@ -480,8 +551,19 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) if(GST_BUFFER_PTS_IS_VALID(sinkbuf)) { /* Set the timestamp of the first output sample) */ - if(element->t0 == GST_CLOCK_TIME_NONE) + if(element->t0 == GST_CLOCK_TIME_NONE) { element->t0 = GST_BUFFER_PTS(sinkbuf) + element->chop_length; + if(element->wait_time > 0) { + /* + * Start monitoring the arrival of new buffers. + * Fill in data if they are late. + */ + pthread_t *thread = g_malloc(sizeof(pthread_t)); + int failure = pthread_create(thread, NULL, input_buffer_timer, (void *) element); + if (failure) + GST_ERROR_OBJECT(element, "Return value of pthread_create() is %d", failure); + } + } /* If we are throwing away any initial data, do it now, and send a zero-length buffer downstream to let other elements know when to expect the first buffer */ if(element->chop_length && GST_BUFFER_PTS(sinkbuf) + GST_BUFFER_DURATION(sinkbuf) <= element->t0) { @@ -519,6 +601,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) /* if buffer is zero length and we are filling in discontinuities, fill it in, unless it has no valid timestamp. */ if(element->fill_discont && (GST_BUFFER_DURATION(sinkbuf) == 0 || GST_BUFFER_OFFSET(sinkbuf) == GST_BUFFER_OFFSET_END(sinkbuf))) { if(GST_BUFFER_PTS_IS_VALID(sinkbuf) && GST_BUFFER_PTS(sinkbuf) > element->last_sinkbuf_ets) { + g_mutex_lock(&element->mutex); switch(element->data_type) { case GSTLAL_INSERTGAP_U32: result = process_inbuf_guint32(NULL, NULL, element, TRUE, TRUE, 0, 0, 0, GST_BUFFER_PTS(sinkbuf), FALSE); @@ -538,6 +621,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) default: g_assert_not_reached(); } + g_mutex_unlock(&element->mutex); } else { GST_DEBUG_OBJECT(element, "dropping zero length buffer at timestamp %lu seconds", (long unsigned) GST_TIME_AS_SECONDS(GST_BUFFER_PTS(sinkbuf))); gst_buffer_unref(sinkbuf); @@ -545,8 +629,7 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) goto done; } - GstMapInfo inmap; - gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ); + g_mutex_lock(&element->mutex); /* We'll need these to decide gaps, offsets, and timestamps on the outgoing buffer(s) */ gboolean sinkbuf_gap = GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_GAP); @@ -556,39 +639,47 @@ static GstFlowReturn chain(GstPad *pad, GstObject *parent, GstBuffer *sinkbuf) GstClockTime sinkbuf_dur = GST_BUFFER_DURATION(sinkbuf); GstClockTime sinkbuf_pts = GST_BUFFER_PTS(sinkbuf); - g_assert_cmpuint(inmap.size % element->unit_size, ==, 0); - if(!element->chop_length || sinkbuf_pts > element->t0) - g_assert_cmpuint((sinkbuf_offset_end - sinkbuf_offset), ==, inmap.size / element->unit_size); /* sanity checks */ - - /* outdata will be filled with the data that goes on the outgoing buffer(s) */ - void *outdata; - outdata = g_malloc((sinkbuf_offset_end - sinkbuf_offset) * element->unit_size); - - switch(element->data_type) { - case GSTLAL_INSERTGAP_U32: - result = process_inbuf_guint32((guint32 *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); - break; - case GSTLAL_INSERTGAP_F32: - result = process_inbuf_float((float *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); - break; - case GSTLAL_INSERTGAP_F64: - result = process_inbuf_double((double *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); - break; - case GSTLAL_INSERTGAP_Z64: - result = process_inbuf_floatcomplex((float complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); - break; - case GSTLAL_INSERTGAP_Z128: - result = process_inbuf_doublecomplex((double complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); - break; + if(sinkbuf_pts >= element->last_sinkbuf_ets) { + GstMapInfo inmap; + gst_buffer_map(sinkbuf, &inmap, GST_MAP_READ); + + g_assert_cmpuint(inmap.size % element->unit_size, ==, 0); + if(!element->chop_length || sinkbuf_pts > element->t0) + g_assert_cmpuint((sinkbuf_offset_end - sinkbuf_offset), ==, inmap.size / element->unit_size); /* sanity checks */ + + /* outdata will be filled with the data that goes on the outgoing buffer(s) */ + void *outdata; + outdata = g_malloc((sinkbuf_offset_end - sinkbuf_offset) * element->unit_size); + + switch(element->data_type) { + case GSTLAL_INSERTGAP_U32: + result = process_inbuf_guint32((guint32 *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); + break; + case GSTLAL_INSERTGAP_F32: + result = process_inbuf_float((float *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); + break; + case GSTLAL_INSERTGAP_F64: + result = process_inbuf_double((double *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, FALSE); + break; + case GSTLAL_INSERTGAP_Z64: + result = process_inbuf_floatcomplex((float complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); + break; + case GSTLAL_INSERTGAP_Z128: + result = process_inbuf_doublecomplex((double complex *) inmap.data, outdata, element, sinkbuf_gap, sinkbuf_discont, sinkbuf_offset, sinkbuf_offset_end, sinkbuf_dur, sinkbuf_pts, TRUE); + break; + + default: + g_assert_not_reached(); + } - default: - g_assert_not_reached(); + g_free(outdata); + outdata = NULL; + gst_buffer_unmap(sinkbuf, &inmap); } - g_free(outdata); - outdata = NULL; - gst_buffer_unmap(sinkbuf, &inmap); gst_buffer_unref(sinkbuf); + g_mutex_unlock(&element->mutex); + /* * done */ @@ -621,7 +712,8 @@ enum property { ARG_REPLACE_VALUE, ARG_BAD_DATA_INTERVALS, ARG_BLOCK_DURATION, - ARG_CHOP_LENGTH + ARG_CHOP_LENGTH, + ARG_WAIT_TIME }; @@ -671,6 +763,9 @@ static void set_property(GObject *object, enum property prop_id, const GValue *v case ARG_CHOP_LENGTH: element->chop_length = g_value_get_uint64(value); break; + case ARG_WAIT_TIME: + element->wait_time = g_value_get_uint64(value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -726,6 +821,9 @@ static void get_property(GObject *object, enum property prop_id, GValue *value, case ARG_CHOP_LENGTH: g_value_set_uint64(value, element->chop_length); break; + case ARG_WAIT_TIME: + g_value_set_uint64(value, element->wait_time); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -751,6 +849,10 @@ static void finalize(GObject *object) g_free(element->bad_data_intervals); element->bad_data_intervals = NULL; + /* Needed for thread used to time input buffers */ + element->finished_running = TRUE; + g_mutex_clear(&element->mutex); + G_OBJECT_CLASS(gstlal_insertgap_parent_class)->finalize(object); } @@ -908,6 +1010,21 @@ static void gstlal_insertgap_class_init(GSTLALInsertGapClass *klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT ) ); + + g_object_class_install_property( + gobject_class, + ARG_WAIT_TIME, + g_param_spec_uint64( + "wait-time", + "Wait Time", + "If nonzero, timestamps of input buffers are tracked and compared to real time.\n\t\t\t" + "If the latest input timestamp is more than the wait-time (in nanoseconds) behind\n\t\t\t" + "real time, output buffers are filled with the replace-value and pushed\n\t\t\t" + "downstream. Default is to disable.", + 0, G_MAXUINT64, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT + ) + ); } @@ -939,6 +1056,7 @@ static void gstlal_insertgap_init(GSTLALInsertGap *element) element->srcpad = pad; /* internal data */ + g_mutex_init(&element->mutex); element->t0 = GST_CLOCK_TIME_NONE; element->bad_data_intervals = NULL; element->array_length = 0; @@ -950,4 +1068,7 @@ static void gstlal_insertgap_init(GSTLALInsertGap *element) element->discont_offset = 0; element->discont_time = 0; element->empty_bufs = 0; + + /* Needed for thread used to time input buffers */ + element->finished_running = FALSE; } diff --git a/gstlal-calibration/gst/lal/gstlal_insertgap.h b/gstlal-calibration/gst/lal/gstlal_insertgap.h index df213921cf33a3a4a82ea3a64ae6c9c047ba3f9f..1707ea5d069f15d2135d9ad9c998945f539c43c7 100644 --- a/gstlal-calibration/gst/lal/gstlal_insertgap.h +++ b/gstlal-calibration/gst/lal/gstlal_insertgap.h @@ -60,12 +60,10 @@ struct _GSTLALInsertGap { GstElement element; /* pads */ - GstPad *sinkpad; GstPad *srcpad; /* stream parameters */ - gint rate; gint channels; gint unit_size; @@ -82,13 +80,13 @@ struct _GSTLALInsertGap { guint64 discont_offset; guint64 discont_time; guint64 empty_bufs; + GMutex mutex; + gboolean finished_running; /* timestamp bookkeeping */ - GstClockTime t0; /* properties */ - gboolean insert_gap; gboolean remove_gap; gboolean remove_nan; @@ -99,6 +97,7 @@ struct _GSTLALInsertGap { gint array_length; guint64 chop_length; GstClockTime block_duration; + guint64 wait_time; }; diff --git a/gstlal-calibration/python/calibration_parts.py b/gstlal-calibration/python/calibration_parts.py index 90812b2f3bfbfcc52a0745c5b45b9f9fd4d7bb6b..687e30d4951d711616b02e6b41f0c09f7481fd1d 100644 --- a/gstlal-calibration/python/calibration_parts.py +++ b/gstlal-calibration/python/calibration_parts.py @@ -1132,7 +1132,7 @@ def compute_exact_kappas_from_filters_file(pipeline, X, freqs, EPICS, rate): print("exact kappas 56") R0 = pipeparts.mktee(pipeline, mkcapsfiltersetter(pipeline, pipeparts.mkgeneric(pipeline, mkadder(pipeline, [mkpow(pipeline, pipeparts.mkcapsfilter(pipeline, b, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), exponent = 3.0), complex_audioamplify(pipeline, mkmultiplier(pipeline, [pipeparts.mkcapsfilter(pipeline, d, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), mkpow(pipeline, pipeparts.mkcapsfilter(pipeline, a, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), exponent = 2.0)]), 8.0, 0.0), complex_audioamplify(pipeline, mkmultiplier(pipeline, [pipeparts.mkcapsfilter(pipeline, a, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), pipeparts.mkcapsfilter(pipeline, b, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), pipeparts.mkcapsfilter(pipeline, c, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate)]), -4.0, 0.0)]), "creal"), "audio/x-raw,format=F64LE,channel-mask=(bitmask)0x0,channels=1")) print("exact kappas 57") - R0sign = pipeparts.mktee(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkmatrixmixer(pipeline, mkmultiplier(pipeline, [pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "creal"), mkpow(pipeline, pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=Z128LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "cabs"), exponent = -1.0)]), matrix = [[1.0, 0.0]]))) + R0sign = pipeparts.mktee(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkmatrixmixer(pipeline, mkmultiplier(pipeline, [pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=F64LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "creal"), mkpow(pipeline, pipeparts.mkgeneric(pipeline, pipeparts.mkcapsfilter(pipeline, R0, "audio/x-raw,format=F64LE,rate=%d,channel-mask=(bitmask)0x0,channels=1" % rate), "cabs"), exponent = -1.0)]), matrix = [[1.0, 0.0]]))) print("exact kappas 60") # Finally, compute kappa_C