Skip to content
Snippets Groups Projects
Commit e9cb5b2b authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

More WIP on the standalone edc, bounded memory use for diag client.

Introduce fixed sized buffers and split the iteration of the channel
list into stages.  This bounds the memory use of the each connected
client.
parent 8fd405ce
No related branches found
No related tags found
No related merge requests found
...@@ -14,7 +14,9 @@ of this distribution. ...@@ -14,7 +14,9 @@ of this distribution.
// - Make appropriate log file entries // - Make appropriate log file entries
// - Get rid of need to build skeleton.st // - Get rid of need to build skeleton.st
#include <atomic>
#include <algorithm> #include <algorithm>
#include <array>
#include <iterator> #include <iterator>
#include <string> #include <string>
#include <thread> #include <thread>
...@@ -119,18 +121,18 @@ typedef union edc_data_t ...@@ -119,18 +121,18 @@ typedef union edc_data_t
unsigned long daqFileCrc; unsigned long daqFileCrc;
typedef struct daqd_c typedef struct daqd_c
{ {
int num_chans; int num_chans;
int con_chans; std::atomic< int > con_chans;
int val_events; int val_events;
int con_events; int con_events;
daq_data_t channel_type[ EDCU_MAX_CHANS ]; daq_data_t channel_type[ EDCU_MAX_CHANS ];
edc_data_t channel_value[ EDCU_MAX_CHANS ]; edc_data_t channel_value[ EDCU_MAX_CHANS ];
char channel_name[ EDCU_MAX_CHANS ][ 64 ]; char channel_name[ EDCU_MAX_CHANS ][ 64 ];
int channel_status[ EDCU_MAX_CHANS ]; int channel_status[ EDCU_MAX_CHANS ];
long gpsTime; long gpsTime;
long epicsSync; long epicsSync;
char* prefix; char* prefix;
int dcuid; int dcuid;
} daqd_c; } daqd_c;
int num_chans_index = -1; int num_chans_index = -1;
...@@ -230,10 +232,173 @@ struct diag_thread_args ...@@ -230,10 +232,173 @@ struct diag_thread_args
diag_thread_queues queues; diag_thread_queues queues;
}; };
/*!
* @brief A Stream interface for rapid json to be paired with a writer. It
* doesn't write any data, instead calculating the size in bytes that would be
* needed to actually encode the data written to it.
*/
class SizeCalcJsonStream
{
public:
typedef char Ch;
SizeCalcJsonStream( ) : count_( 0 )
{
}
Ch
Peek( ) const
{
throw std::runtime_error( "Peek not implemented" );
}
Ch
Take( )
{
throw std::runtime_error( "Take not implemented" );
}
size_t
Tell( )
{
throw std::runtime_error( "Tell not implemented" );
}
Ch*
PutBegin( )
{
throw std::runtime_error( "PutBegin not implemented" );
};
void
Put( Ch c )
{
++count_;
}
void
Flush( )
{
}
size_t
PutEnd( Ch* begin )
{
throw std::runtime_error( "PutEnd not implemented" );
}
bool
SpaceAvailable( ) const
{
return true;
}
std::size_t
GetSize( ) const
{
return count_;
}
private:
std::size_t count_;
};
/*!
* @brief A Stream interface for rapid json to be paired with a writer. It
* writes into a fixed size buffer, and reports if there some space left.
*/
template < std::size_t MIN_SIZE >
class FixedSizeJsonStream
{
public:
typedef char Ch;
FixedSizeJsonStream( char* begin, char* end )
: begin_( begin ), cur_( begin ), end_( end )
{
}
Ch
Peek( ) const
{
throw std::runtime_error( "Peek not implemented" );
}
Ch
Take( )
{
throw std::runtime_error( "Take not implemented" );
}
size_t
Tell( )
{
throw std::runtime_error( "Tell not implemented" );
}
Ch*
PutBegin( )
{
throw std::runtime_error( "PutBegin not implemented" );
};
void
Put( Ch c )
{
if ( cur_ < end_ )
{
*cur_ = c;
++cur_;
}
else
{
throw std::out_of_range( "The FixedSizeBuffer has overflown" );
}
}
void
Flush( )
{
}
size_t
PutEnd( Ch* begin )
{
throw std::runtime_error( "PutEnd not implemented" );
}
void
AdvanceCounter( std::size_t count )
{
cur_ += count;
}
bool
SpaceAvailable( ) const
{
return ( end_ - cur_ ) > MIN_SIZE;
}
Ch*
GetString( ) const
{
return begin_;
}
std::size_t
GetSize( ) const
{
return cur_ - begin_;
}
void
Reset( )
{
cur_ = begin_;
}
private:
Ch* begin_;
Ch* cur_;
Ch* end_;
};
struct diag_client_conn struct diag_client_conn
{ {
diag_client_conn( io_context_t& io ) explicit diag_client_conn( io_context_t& io )
: io_context( io ), socket( io ), data( ), buffer( ) : io_context( io ), socket( io ), data( ), buffer( ),
buffer_stream( buffer.data( ), buffer.data( ) + buffer.size( ) ),
writer( buffer_stream ), list_progress( 0 )
{ {
} }
...@@ -241,7 +406,10 @@ struct diag_client_conn ...@@ -241,7 +406,10 @@ struct diag_client_conn
tcp::socket socket; tcp::socket socket;
diag_info_block data; diag_info_block data;
std::string buffer; std::array< char, 32000 > buffer;
FixedSizeJsonStream< 256 > buffer_stream;
rapidjson::Writer< FixedSizeJsonStream< 256 > > writer;
int list_progress;
}; };
// forward // forward
...@@ -1070,42 +1238,121 @@ void accept_loop( io_context_t& io_context, ...@@ -1070,42 +1238,121 @@ void accept_loop( io_context_t& io_context,
tcp::acceptor& acceptor, tcp::acceptor& acceptor,
const diag_info_block& cur_diag ); const diag_info_block& cur_diag );
/*!
* @brief start writing the json by doing the initial header
* @tparam Writer The writer
* @param writer the writer
* @param conn the connection diag info to dump
*/
template < typename Writer >
void void
client_write( std::shared_ptr< diag_client_conn > conn ) client_open_json( Writer& writer, diag_client_conn& conn )
{ {
rapidjson::StringBuffer s;
rapidjson::Writer< rapidjson::StringBuffer > writer( s );
writer.StartObject( ); writer.StartObject( );
writer.Key( "TotalChannels" ); writer.Key( "TotalChannels" );
writer.Int( daqd_edcu1.num_chans ); writer.Int( daqd_edcu1.num_chans );
writer.Key( "ConnectedChannelCount" ); writer.Key( "ConnectedChannelCount" );
writer.Int( conn->data.con_chans ); writer.Int( conn.data.con_chans );
writer.Key( "DisconnectedChannelCount" ); writer.Key( "DisconnectedChannelCount" );
writer.Int( daqd_edcu1.num_chans - conn->data.con_chans ); writer.Int( daqd_edcu1.num_chans - conn.data.con_chans );
writer.Key( "DisconnectedChannels" ); writer.Key( "DisconnectedChannels" );
writer.StartArray( ); writer.StartArray( );
for ( int i = 0; i < daqd_edcu1.num_chans; ++i ) }
/*!
* @brief iteratate over the channel list and output channel entries until
* the list is done, or the supplied buffer reports no more room.
* @tparam Writer The writer
* @tparam Buffer The buffer
* @param writer The writer
* @param buf The buffer
* @param conn The connection and diag info to iterate over
* @param cur The index to start on
* @return The next index to start at (daqd_edc1.num_chans when it is complete)
*/
template < typename Writer, typename Buffer >
int
client_fill_json( Writer& writer, Buffer& buf, diag_client_conn& conn, int cur )
{
for ( ; cur != daqd_edcu1.num_chans && buf.SpaceAvailable( ); ++cur )
{ {
if ( conn->data.status[ i ] != 0 ) if ( conn.data.status[ cur ] != 0 )
{ {
writer.String( daqd_edcu1.channel_name[ i ] ); writer.String( daqd_edcu1.channel_name[ cur ] );
} }
} }
return cur;
}
/*!
* @brief Close out the client diagnositcs json message.
* @tparam Writer The writer
* @param writer the writer
*/
template < typename Writer >
void
client_close_json( Writer& writer )
{
writer.EndArray( ); writer.EndArray( );
writer.EndObject( ); writer.EndObject( );
}
/*!
* @brief calculate the size needed to hold a json message representing the
* diagnostics for the given client connection
* @param conn client connection and diagnostics object
* @return the number of bytes needed to hold the json message
*/
std::size_t
calc_required_size_for_json( diag_client_conn& conn )
{
SizeCalcJsonStream counting_buffer;
rapidjson::Writer< SizeCalcJsonStream > writer( counting_buffer );
client_open_json( writer, conn );
client_fill_json( writer, counting_buffer, conn, 0 );
client_close_json( writer );
return counting_buffer.GetSize( );
}
void
client_finalize( std::shared_ptr< diag_client_conn > conn )
{
conn->buffer_stream.Reset( );
client_close_json( conn->writer );
boost::asio::async_write(
conn->socket,
boost::asio::buffer( conn->buffer_stream.GetString( ),
conn->buffer_stream.GetSize( ) ),
[conn]( const boost::system::error_code& ec, std::size_t ) {} );
}
void
client_write( std::shared_ptr< diag_client_conn > conn )
{
if ( conn->list_progress == daqd_edcu1.num_chans )
{
client_finalize( std::move( conn ) );
return;
}
conn->buffer_stream.Reset( );
conn->list_progress = client_fill_json(
conn->writer, conn->buffer_stream, *conn, conn->list_progress );
std::ostringstream os; if ( conn->buffer_stream.GetSize( ) == 0 )
os << std::hex << s.GetSize( ) << "\r\n"; {
conn->buffer = os.str( ) + std::string( s.GetString( ), s.GetSize( ) ) + client_finalize( std::move( conn ) );
std::string( "\r\n0\r\n\r\n" ); return;
}
boost::asio::async_write( boost::asio::async_write(
conn->socket, conn->socket,
boost::asio::buffer( conn->buffer ), boost::asio::buffer( conn->buffer_stream.GetString( ),
conn->buffer_stream.GetSize( ) ),
[conn]( const boost::system::error_code& ec, std::size_t ) { [conn]( const boost::system::error_code& ec, std::size_t ) {
if ( !ec ) if ( !ec )
{ {
client_write( conn );
} }
} ); } );
} }
...@@ -1113,15 +1360,36 @@ client_write( std::shared_ptr< diag_client_conn > conn ) ...@@ -1113,15 +1360,36 @@ client_write( std::shared_ptr< diag_client_conn > conn )
void void
start_client_write( std::shared_ptr< diag_client_conn > conn ) start_client_write( std::shared_ptr< diag_client_conn > conn )
{ {
static std::string header{ "HTTP/1.1 200 Ok\r\n" const static char* header{ "HTTP/1.0 200 Ok\r\n"
"Content-Type: application/json\r\n" "Content-Type: application/json\r\n"
"Cache-Control: max-age=1\r\n" "Cache-Control: max-age=1\r\n"
"Transfer-Encoding: chunked\r\n" "Content-Length: %d\r\n"
"Connection: close\r\n\r\n" }; "Connection: close\r\n\r\n" };
std::size_t req_size = calc_required_size_for_json( *conn );
std::size_t header_size = static_cast< std::size_t >(
snprintf( conn->buffer.data( ),
conn->buffer.size( ),
header,
static_cast< int >( req_size ) ) );
if ( header_size >= conn->buffer.size( ) )
{
throw std::out_of_range( "Buffer overflowed while sending headers" );
}
conn->buffer_stream.Reset( );
conn->buffer_stream.AdvanceCounter( header_size );
conn->writer.Reset( conn->buffer_stream );
conn->list_progress = 0;
client_open_json( conn->writer, *conn );
conn->list_progress = client_fill_json(
conn->writer, conn->buffer_stream, *conn, conn->list_progress );
boost::asio::async_write( boost::asio::async_write(
conn->socket, conn->socket,
boost::asio::buffer( header ), boost::asio::buffer( conn->buffer_stream.GetString( ),
conn->buffer_stream.GetSize( ) ),
[conn]( const boost::system::error_code& ec, std::size_t ) { [conn]( const boost::system::error_code& ec, std::size_t ) {
if ( !ec ) if ( !ec )
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment