diff --git a/src/epics/seq/standalone_edcu.cc b/src/epics/seq/standalone_edcu.cc index ae0e2fd83b0143f5264d0a0db942fa47064a9c23..3d7e93466c23366d405690e129b1bb720b65d199 100644 --- a/src/epics/seq/standalone_edcu.cc +++ b/src/epics/seq/standalone_edcu.cc @@ -14,7 +14,9 @@ of this distribution. // - Make appropriate log file entries // - Get rid of need to build skeleton.st +#include <atomic> #include <algorithm> +#include <array> #include <iterator> #include <string> #include <thread> @@ -119,18 +121,18 @@ typedef union edc_data_t unsigned long daqFileCrc; typedef struct daqd_c { - int num_chans; - int con_chans; - int val_events; - int con_events; - daq_data_t channel_type[ EDCU_MAX_CHANS ]; - edc_data_t channel_value[ EDCU_MAX_CHANS ]; - char channel_name[ EDCU_MAX_CHANS ][ 64 ]; - int channel_status[ EDCU_MAX_CHANS ]; - long gpsTime; - long epicsSync; - char* prefix; - int dcuid; + int num_chans; + std::atomic< int > con_chans; + int val_events; + int con_events; + daq_data_t channel_type[ EDCU_MAX_CHANS ]; + edc_data_t channel_value[ EDCU_MAX_CHANS ]; + char channel_name[ EDCU_MAX_CHANS ][ 64 ]; + int channel_status[ EDCU_MAX_CHANS ]; + long gpsTime; + long epicsSync; + char* prefix; + int dcuid; } daqd_c; int num_chans_index = -1; @@ -230,10 +232,173 @@ struct diag_thread_args diag_thread_queues queues; }; +/*! + * @brief A Stream interface for rapid json to be paired with a writer. It + * doesn't write any data, instead calculating the size in bytes that would be + * needed to actually encode the data written to it. + */ +class SizeCalcJsonStream +{ +public: + typedef char Ch; + SizeCalcJsonStream( ) : count_( 0 ) + { + } + + Ch + Peek( ) const + { + throw std::runtime_error( "Peek not implemented" ); + } + Ch + Take( ) + { + throw std::runtime_error( "Take not implemented" ); + } + size_t + Tell( ) + { + throw std::runtime_error( "Tell not implemented" ); + } + + Ch* + PutBegin( ) + { + throw std::runtime_error( "PutBegin not implemented" ); + }; + + void + Put( Ch c ) + { + ++count_; + } + void + Flush( ) + { + } + size_t + PutEnd( Ch* begin ) + { + throw std::runtime_error( "PutEnd not implemented" ); + } + + bool + SpaceAvailable( ) const + { + return true; + } + + std::size_t + GetSize( ) const + { + return count_; + } + +private: + std::size_t count_; +}; + +/*! + * @brief A Stream interface for rapid json to be paired with a writer. It + * writes into a fixed size buffer, and reports if there some space left. + */ +template < std::size_t MIN_SIZE > +class FixedSizeJsonStream +{ +public: + typedef char Ch; + FixedSizeJsonStream( char* begin, char* end ) + : begin_( begin ), cur_( begin ), end_( end ) + { + } + + Ch + Peek( ) const + { + throw std::runtime_error( "Peek not implemented" ); + } + Ch + Take( ) + { + throw std::runtime_error( "Take not implemented" ); + } + size_t + Tell( ) + { + throw std::runtime_error( "Tell not implemented" ); + } + + Ch* + PutBegin( ) + { + throw std::runtime_error( "PutBegin not implemented" ); + }; + + void + Put( Ch c ) + { + if ( cur_ < end_ ) + { + *cur_ = c; + ++cur_; + } + else + { + throw std::out_of_range( "The FixedSizeBuffer has overflown" ); + } + } + void + Flush( ) + { + } + size_t + PutEnd( Ch* begin ) + { + throw std::runtime_error( "PutEnd not implemented" ); + } + + void + AdvanceCounter( std::size_t count ) + { + cur_ += count; + } + + bool + SpaceAvailable( ) const + { + return ( end_ - cur_ ) > MIN_SIZE; + } + + Ch* + GetString( ) const + { + return begin_; + } + + std::size_t + GetSize( ) const + { + return cur_ - begin_; + } + + void + Reset( ) + { + cur_ = begin_; + } + +private: + Ch* begin_; + Ch* cur_; + Ch* end_; +}; + struct diag_client_conn { - diag_client_conn( io_context_t& io ) - : io_context( io ), socket( io ), data( ), buffer( ) + explicit diag_client_conn( io_context_t& io ) + : io_context( io ), socket( io ), data( ), buffer( ), + buffer_stream( buffer.data( ), buffer.data( ) + buffer.size( ) ), + writer( buffer_stream ), list_progress( 0 ) { } @@ -241,7 +406,10 @@ struct diag_client_conn tcp::socket socket; diag_info_block data; - std::string buffer; + std::array< char, 32000 > buffer; + FixedSizeJsonStream< 256 > buffer_stream; + rapidjson::Writer< FixedSizeJsonStream< 256 > > writer; + int list_progress; }; // forward @@ -1070,42 +1238,121 @@ void accept_loop( io_context_t& io_context, tcp::acceptor& acceptor, const diag_info_block& cur_diag ); +/*! + * @brief start writing the json by doing the initial header + * @tparam Writer The writer + * @param writer the writer + * @param conn the connection diag info to dump + */ +template < typename Writer > void -client_write( std::shared_ptr< diag_client_conn > conn ) +client_open_json( Writer& writer, diag_client_conn& conn ) { - rapidjson::StringBuffer s; - rapidjson::Writer< rapidjson::StringBuffer > writer( s ); - writer.StartObject( ); writer.Key( "TotalChannels" ); writer.Int( daqd_edcu1.num_chans ); writer.Key( "ConnectedChannelCount" ); - writer.Int( conn->data.con_chans ); + writer.Int( conn.data.con_chans ); writer.Key( "DisconnectedChannelCount" ); - writer.Int( daqd_edcu1.num_chans - conn->data.con_chans ); + writer.Int( daqd_edcu1.num_chans - conn.data.con_chans ); writer.Key( "DisconnectedChannels" ); writer.StartArray( ); - for ( int i = 0; i < daqd_edcu1.num_chans; ++i ) +} + +/*! + * @brief iteratate over the channel list and output channel entries until + * the list is done, or the supplied buffer reports no more room. + * @tparam Writer The writer + * @tparam Buffer The buffer + * @param writer The writer + * @param buf The buffer + * @param conn The connection and diag info to iterate over + * @param cur The index to start on + * @return The next index to start at (daqd_edc1.num_chans when it is complete) + */ +template < typename Writer, typename Buffer > +int +client_fill_json( Writer& writer, Buffer& buf, diag_client_conn& conn, int cur ) +{ + for ( ; cur != daqd_edcu1.num_chans && buf.SpaceAvailable( ); ++cur ) { - if ( conn->data.status[ i ] != 0 ) + if ( conn.data.status[ cur ] != 0 ) { - writer.String( daqd_edcu1.channel_name[ i ] ); + writer.String( daqd_edcu1.channel_name[ cur ] ); } } + return cur; +} + +/*! + * @brief Close out the client diagnositcs json message. + * @tparam Writer The writer + * @param writer the writer + */ +template < typename Writer > +void +client_close_json( Writer& writer ) +{ writer.EndArray( ); writer.EndObject( ); +} + +/*! + * @brief calculate the size needed to hold a json message representing the + * diagnostics for the given client connection + * @param conn client connection and diagnostics object + * @return the number of bytes needed to hold the json message + */ +std::size_t +calc_required_size_for_json( diag_client_conn& conn ) +{ + SizeCalcJsonStream counting_buffer; + rapidjson::Writer< SizeCalcJsonStream > writer( counting_buffer ); + + client_open_json( writer, conn ); + client_fill_json( writer, counting_buffer, conn, 0 ); + client_close_json( writer ); + return counting_buffer.GetSize( ); +} + +void +client_finalize( std::shared_ptr< diag_client_conn > conn ) +{ + conn->buffer_stream.Reset( ); + client_close_json( conn->writer ); + boost::asio::async_write( + conn->socket, + boost::asio::buffer( conn->buffer_stream.GetString( ), + conn->buffer_stream.GetSize( ) ), + [conn]( const boost::system::error_code& ec, std::size_t ) {} ); +} + +void +client_write( std::shared_ptr< diag_client_conn > conn ) +{ + if ( conn->list_progress == daqd_edcu1.num_chans ) + { + client_finalize( std::move( conn ) ); + return; + } + conn->buffer_stream.Reset( ); + conn->list_progress = client_fill_json( + conn->writer, conn->buffer_stream, *conn, conn->list_progress ); - std::ostringstream os; - os << std::hex << s.GetSize( ) << "\r\n"; - conn->buffer = os.str( ) + std::string( s.GetString( ), s.GetSize( ) ) + - std::string( "\r\n0\r\n\r\n" ); + if ( conn->buffer_stream.GetSize( ) == 0 ) + { + client_finalize( std::move( conn ) ); + return; + } boost::asio::async_write( conn->socket, - boost::asio::buffer( conn->buffer ), + boost::asio::buffer( conn->buffer_stream.GetString( ), + conn->buffer_stream.GetSize( ) ), [conn]( const boost::system::error_code& ec, std::size_t ) { if ( !ec ) { + client_write( conn ); } } ); } @@ -1113,15 +1360,36 @@ client_write( std::shared_ptr< diag_client_conn > conn ) void start_client_write( std::shared_ptr< diag_client_conn > conn ) { - static std::string header{ "HTTP/1.1 200 Ok\r\n" + const static char* header{ "HTTP/1.0 200 Ok\r\n" "Content-Type: application/json\r\n" "Cache-Control: max-age=1\r\n" - "Transfer-Encoding: chunked\r\n" + "Content-Length: %d\r\n" "Connection: close\r\n\r\n" }; + std::size_t req_size = calc_required_size_for_json( *conn ); + + std::size_t header_size = static_cast< std::size_t >( + snprintf( conn->buffer.data( ), + conn->buffer.size( ), + header, + static_cast< int >( req_size ) ) ); + if ( header_size >= conn->buffer.size( ) ) + { + throw std::out_of_range( "Buffer overflowed while sending headers" ); + } + conn->buffer_stream.Reset( ); + conn->buffer_stream.AdvanceCounter( header_size ); + conn->writer.Reset( conn->buffer_stream ); + conn->list_progress = 0; + client_open_json( conn->writer, *conn ); + + conn->list_progress = client_fill_json( + conn->writer, conn->buffer_stream, *conn, conn->list_progress ); + boost::asio::async_write( conn->socket, - boost::asio::buffer( header ), + boost::asio::buffer( conn->buffer_stream.GetString( ), + conn->buffer_stream.GetSize( ) ), [conn]( const boost::system::error_code& ec, std::size_t ) { if ( !ec ) {