Skip to content
Snippets Groups Projects
Commit 8219ff91 authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Adding a timeout to the tcp subscriptions.

 * Currently defaulting to a 60s timeout.
 * Applies to connection, read, and write operations.
parent ad208a27
Branches master
No related merge requests found
......@@ -60,7 +60,7 @@ namespace pub_sub
// std::cout << "Create sub client addr: " <<
// (void*)this
// << std::endl;
timer_.expires_from_now( boost::asio::chrono::seconds( 60 ) );
s_.async_connect( endpoint,
[ self = shared_from_this( ) ](
const boost::system::error_code& ec ) {
......@@ -72,13 +72,7 @@ namespace pub_sub
self->connected_ = true;
self->request_message( );
} );
timer_.async_wait( [ self = shared_from_this( ) ](
const boost::system::error_code& ec ) {
if ( !ec )
{
self->connection_timeout( );
}
} );
start_timeout_timer();
}
~SubClient( )
......@@ -108,9 +102,18 @@ namespace pub_sub
private:
void
connection_timeout( )
start_timeout_timer()
{
timer_.expires_from_now( timeout_ );
timer_.async_wait( [self = shared_from_this( )](
const boost::system::error_code& ec ) {
self->connection_timeout( ec );
} );
}
void
connection_timeout( const boost::system::error_code& ec )
{
if ( connected_ )
if (ec)
{
return;
}
......@@ -152,12 +155,14 @@ namespace pub_sub
[ self = shared_from_this( ) ](
const boost::system::error_code& ec,
std::size_t bytes_transferred ) {
self->timer_.cancel();
if ( ec )
{
return;
}
self->read_message_header( );
} );
start_timeout_timer();
}
/*!
......@@ -173,12 +178,14 @@ namespace pub_sub
[ self = shared_from_this( ) ](
const ::boost::system::error_code& ec,
std::size_t bytes_transferred ) {
self->timer_.cancel();
if ( ec )
{
return;
}
self->read_message_body( );
} );
start_timeout_timer();
}
/*!
......@@ -199,10 +206,11 @@ namespace pub_sub
boost::asio::async_read(
s_,
buf,
[ self = shared_from_this( ),
data_buf = std::move( data ),
start ]( const ::boost::system::error_code& ec,
std::size_t bytes_transferred ) mutable {
[self = shared_from_this( ),
data_buf = std::move( data ),
start]( const ::boost::system::error_code& ec,
std::size_t bytes_transferred ) mutable {
self->timer_.cancel();
if ( ec )
{
return;
......@@ -210,6 +218,7 @@ namespace pub_sub
self->receive_message_body( std::move( data_buf ),
start );
} );
start_timeout_timer();
}
/*!
......@@ -248,6 +257,7 @@ namespace pub_sub
std::atomic< bool >& stopping_;
bool connected_{ false };
boost::asio::steady_timer timer_;
std::chrono::duration<std::int64_t> timeout_{ std::chrono::seconds(60) };
friend void intrusive_ptr_add_ref( SubClient* );
friend void intrusive_ptr_release( SubClient* );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment