Skip to content
Snippets Groups Projects
Commit a436ca84 authored by chad.hanna's avatar chad.hanna Committed by Kipp Cannon
Browse files

lvshmsrc.cc: fix for wait time bug, surabhi and patrick confirmed it works

parent d9b3e34e
No related branches found
No related tags found
No related merge requests found
...@@ -68,6 +68,7 @@ ...@@ -68,6 +68,7 @@
#include <gds/lsmp_con.hh> #include <gds/lsmp_con.hh>
#include <gds/tconv.h> #include <gds/tconv.h>
#include <gds/SysError.hh>
/* /*
...@@ -331,7 +332,7 @@ static GstFlowReturn create(GstBaseSrc *basesrc, guint64 offset, guint size, Gst ...@@ -331,7 +332,7 @@ static GstFlowReturn create(GstBaseSrc *basesrc, guint64 offset, guint size, Gst
GstBaseSrcClass *basesrc_class = GST_BASE_SRC_CLASS(G_OBJECT_GET_CLASS(basesrc)); GstBaseSrcClass *basesrc_class = GST_BASE_SRC_CLASS(G_OBJECT_GET_CLASS(basesrc));
GDSLVSHMSrc *element = GDS_LVSHMSRC(basesrc); GDSLVSHMSrc *element = GDS_LVSHMSRC(basesrc);
GstMapInfo mapinfo; GstMapInfo mapinfo;
GstClockTime t_before, t_after; GstClockTime t_before;
int flags = 0; /* LVSHM_NOWAIT is not set = respect wait time */ int flags = 0; /* LVSHM_NOWAIT is not set = respect wait time */
const char *data; const char *data;
unsigned length; unsigned length;
...@@ -356,11 +357,37 @@ static GstFlowReturn create(GstBaseSrc *basesrc, guint64 offset, guint size, Gst ...@@ -356,11 +357,37 @@ static GstFlowReturn create(GstBaseSrc *basesrc, guint64 offset, guint size, Gst
element->create_thread = pthread_self(); element->create_thread = pthread_self();
while(1) { while(1) {
int catch_error = 0;
int save_errno = 0;
g_mutex_lock(&element->create_thread_lock); g_mutex_lock(&element->create_thread_lock);
t_before = GPSNow(); t_before = GPSNow();
data = lsmp_partition(element)->get_buffer(flags); try {
t_after = GPSNow(); /*
* LSMP_CON::get_buffer API: https://bugs.ligo.org/redmine/issues/6225
*/
data = lsmp_partition(element) -> get_buffer(flags);
}
catch (const SysError & e) {
GST_WARNING_OBJECT(element, "Caught SysError description: [%s]", e.what());
catch_error = 1;
}
catch (const std::exception & e) {
GST_WARNING_OBJECT(element, "Caught error with description: [%s]", e.what());
catch_error = 2;
}
save_errno = errno;
g_mutex_unlock(&element->create_thread_lock); g_mutex_unlock(&element->create_thread_lock);
if(catch_error) {
switch(save_errno) {
case EIDRM:
GST_ELEMENT_ERROR(element, RESOURCE, READ, (NULL), ("EIDRM received in gst_buffer(), shared memory partition [%s] possibly removed.", element->name));
return GST_FLOW_EOS;
default:
GST_ELEMENT_ERROR(element, RESOURCE, READ, (NULL), ("Caught error in gst_buffer(), errno: [%d]", save_errno));
return GST_FLOW_EOS;
}
}
if(!data) { if(!data) {
/* /*
* data retrieval failed. guess cause. * data retrieval failed. guess cause.
...@@ -378,48 +405,67 @@ static GstFlowReturn create(GstBaseSrc *basesrc, guint64 offset, guint size, Gst ...@@ -378,48 +405,67 @@ static GstFlowReturn create(GstBaseSrc *basesrc, guint64 offset, guint size, Gst
GST_DEBUG_OBJECT(element, "unlock() called, no buffer created"); GST_DEBUG_OBJECT(element, "unlock() called, no buffer created");
return GST_FLOW_EOS; return GST_FLOW_EOS;
} else if(element->wait_time > 0. && (GstClockTimeDiff) (t_after - t_before) >= (GstClockTimeDiff) (element->wait_time * GST_SECOND)) { }
/* else {
* assume reason for failure was a timeout. switch(save_errno) {
* create a 0-length buffer with a guess as case EAGAIN:
* to the timestamp of the missing data. /*
* guess: the time when we started waiting * assume reason for failure was a timeout.
* for the data adjusted by the most * create a 0-length buffer with a guess as
* recently measured latency * to the timestamp of the missing data.
* * guess: the time when we started waiting
* FIXME: we need an API that can tell us * for the data adjusted by the most
* the timestamp of the missing data, e.g., * recently measured latency
* when we receive data tell us what its *
* duration is so when know how much we've * FIXME: we need an API that can tell us
* received. * the timestamp of the missing data, e.g.,
*/ * when we receive data tell us what its
* duration is so when know how much we've
GST_DEBUG_OBJECT(element, "timeout occured, creating 0-length heartbeat buffer"); * received.
*/
*buffer = gst_buffer_new();
GST_BUFFER_PTS(*buffer) = t_before; GST_WARNING_OBJECT(element, "timeout occurred, creating 0-length heartbeat buffer");
if(GST_CLOCK_TIME_IS_VALID(element->max_latency))
GST_BUFFER_PTS(*buffer) -= element->max_latency; *buffer = gst_buffer_new();
if(GST_BUFFER_PTS(*buffer) < element->next_timestamp) { GST_BUFFER_PTS(*buffer) = t_before;
GST_LOG_OBJECT(element, "time reversal. skipping buffer."); if(GST_CLOCK_TIME_IS_VALID(element->max_latency))
gst_buffer_unref(*buffer); GST_BUFFER_PTS(*buffer) -= element->max_latency;
*buffer = NULL; if(GST_BUFFER_PTS(*buffer) < element->next_timestamp) {
continue; GST_LOG_OBJECT(element, "time reversal. skipping buffer.");
gst_buffer_unref(*buffer);
*buffer = NULL;
continue;
}
GST_DEBUG_OBJECT(element, "heartbeat timestamp = %" GST_TIME_SECONDS_FORMAT, GST_TIME_SECONDS_ARGS(GST_BUFFER_PTS(*buffer)));
GST_BUFFER_DURATION(*buffer) = 0;
GST_BUFFER_OFFSET(*buffer) = offset;
GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET_NONE;
element->next_timestamp = GST_BUFFER_PTS(*buffer) + GST_BUFFER_DURATION(*buffer);
return result;
case EINTR:
GST_WARNING_OBJECT(element, "EINTR received from gst_buffer(), could be due to a signal or failure to get write control over the partition.");
/*
* Wait a bit and try again just in case
*/
usleep(5 * 1000);
break;
case EBUSY:
GST_ELEMENT_ERROR(element, RESOURCE, READ, (NULL), ("EBUSY received from gst_buffer(), the consumer already has a buffer assigned."));
return GST_FLOW_EOS;
case EINVAL:
GST_ELEMENT_ERROR(element, RESOURCE, READ, (NULL), ("EINVAL received from gst_buffer(), the consumer instance is not attached to a partition."));
return GST_FLOW_EOS;
case ENOENT:
GST_ELEMENT_ERROR(element, RESOURCE, READ, (NULL), ("ENOENT received from gst_buffer(), findDataID could not find a buffer with the requested ID."));
return GST_FLOW_EOS;
default:
/*
* reason for failure is not known.
* indicate end-of-stream
*/
GST_ELEMENT_ERROR(element, RESOURCE, READ, (NULL), ("unknown failure retrieving buffer from GDS shared memory. errno: [%d]", save_errno));
return GST_FLOW_EOS;
} }
GST_DEBUG_OBJECT(element, "heartbeat timestamp = %" GST_TIME_SECONDS_FORMAT, GST_TIME_SECONDS_ARGS(GST_BUFFER_PTS(*buffer)));
GST_BUFFER_DURATION(*buffer) = 0;
GST_BUFFER_OFFSET(*buffer) = offset;
GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET_NONE;
element->next_timestamp = GST_BUFFER_PTS(*buffer) + GST_BUFFER_DURATION(*buffer);
return result;
} else {
/*
* reason for failure is not known.
* indicate end-of-stream
*/
GST_ELEMENT_ERROR(element, RESOURCE, READ, (NULL), ("unknown failure retrieving buffer from GDS shared memory. possible causes include: timeout, interupted by signal, no data available."));
return GST_FLOW_EOS;
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment