From 8fd405cea5fdfbe171e5f2eb5bb4f356b2fd8ebb Mon Sep 17 00:00:00 2001 From: Jonathan Hanks <jonathan.hanks@ligo.org> Date: Fri, 4 Oct 2019 22:10:49 -0700 Subject: [PATCH] WIP adding http diagnostics to the standalone_edc. This allows disconnected channel lists to be extracted from the edc. --- CMakeLists.txt | 2 + src/epics/seq/CMakeLists.txt | 16 +- src/epics/seq/standalone_edcu.cc | 362 ++++++++++++++++++++++++++++++- 3 files changed, 375 insertions(+), 5 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c35c1df1..04d0cb8ed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,6 +26,8 @@ FIND_PACKAGE(OpenMX) FIND_PACKAGE(ZMQ4) FIND_PACKAGE(Dolphin) FIND_PACKAGE(NDS2Client) +FIND_PACKAGE(Boost COMPONENTS system) +FIND_PACKAGE(RapidJSON) CHECK_CXX_SOURCE_COMPILES("#include <iostream> #include <FlexLexer.h> diff --git a/src/epics/seq/CMakeLists.txt b/src/epics/seq/CMakeLists.txt index 4d1d6722a..32e9152f3 100644 --- a/src/epics/seq/CMakeLists.txt +++ b/src/epics/seq/CMakeLists.txt @@ -1,11 +1,21 @@ +if (Boost_FOUND) + add_executable(standalone_edc standalone_edcu.cc ${CMAKE_CURRENT_SOURCE_DIR}/../../drv/rfm.c) target_include_directories(standalone_edc PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../include" - "${CMAKE_CURRENT_SOURCE_DIR}/../../include/drv") + "${CMAKE_CURRENT_SOURCE_DIR}/../../include/drv" + ${Boost_INCLUDE_DIRS}) target_link_libraries(standalone_edc PUBLIC epics::ca - driver::ini_parsing) + driver::ini_parsing + ${Boost_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT}) +target_requires_cpp11(standalone_edc PUBLIC) + +install(TARGETS standalone_edc DESTINATION bin) -install(TARGETS standalone_edc DESTINATION bin) \ No newline at end of file +else(Boost_FOUND) + message(WARNING "The standalone_edc will not be build as boost was not found") +endif (Boost_FOUND) \ No newline at end of file diff --git a/src/epics/seq/standalone_edcu.cc b/src/epics/seq/standalone_edcu.cc index d16a57d52..ae0e2fd83 100644 --- a/src/epics/seq/standalone_edcu.cc +++ b/src/epics/seq/standalone_edcu.cc @@ -14,6 +14,12 @@ of this distribution. // - Make appropriate log file entries // - Get rid of need to build skeleton.st +#include <algorithm> +#include <iterator> +#include <string> +#include <thread> +#include <utility> + #include <stddef.h> #include <stdlib.h> #include <string.h> @@ -28,6 +34,14 @@ of this distribution. #include <daqmap.h> +#define BOOST_ASIO_USE_BOOST_DATE_TIME_FOR_SOCKET_IOSTREAM +#include <boost/asio.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/lockfree/spsc_queue.hpp> + +#include <rapidjson/writer.h> +#include <rapidjson/stringbuffer.h> + /* taken from channel.h in the daqd source * find a way to do this better */ @@ -57,6 +71,38 @@ extern "C" { #include <iostream> #define EDCU_MAX_CHANS 50000 +#define DIAG_QUEUE_DEPTH 3 + +using boost::asio::ip::tcp; + +#if BOOST_ASIO_VERSION < 101200 +using io_context_t = boost::asio::io_service; +inline io_context_t& +get_context( tcp::acceptor& acceptor ) +{ + return acceptor.get_io_service( ); +} + +inline boost::asio::ip::address +make_address( const char* str ) +{ + return boost::asio::ip::address::from_string( str ); +} +#else +using io_context_t = boost::asio::io_context; + +inline io_context_t& +get_context( tcp::acceptor& acceptor ) +{ + return acceptor.get_executor( ).context( ); +} + +inline boost::asio::ip::address +make_address( const char* str ) +{ + return boost::asio::ip::make_address( str ); +} +#endif // Function prototypes // **************************************************************************************** @@ -111,6 +157,97 @@ int timemarks[ 16 ] = { 1000 * 1000, 63500 * 1000, 126000 * 1000, 938500 * 1000 }; int nextTrig = 0; +/*! + * @brief A collection of information used for diagnostic output + */ +struct diag_info_block +{ + diag_info_block( ) : con_chans( 0 ), status( ) + { + std::fill( std::begin( status ), std::end( status ), 0xbad ); + } + diag_info_block( const diag_info_block& other ) + : con_chans( other.con_chans ), status( ) + { + std::copy( std::begin( other.status ), + std::end( other.status ), + std::begin( status ) ); + } + diag_info_block& + operator=( const diag_info_block& other ) + { + con_chans = other.con_chans; + std::copy( std::begin( other.status ), + std::end( other.status ), + std::begin( status ) ); + return *this; + } + + int con_chans; + int status[ EDCU_MAX_CHANS ]; +}; + +typedef boost::lockfree::spsc_queue< + diag_info_block*, + boost::lockfree::capacity< DIAG_QUEUE_DEPTH > > + diag_queue_t; + +/*! + * @brief this structure is used to refer to the message queues used between the + * main thread and the diag thread. + */ +struct diag_thread_queues +{ + diag_thread_queues( diag_queue_t& message_queue, + diag_queue_t& free_list_queue ) + : msg_queue( message_queue ), free_queue( free_list_queue ) + { + } + + diag_queue_t& msg_queue; + diag_queue_t& free_queue; +}; + +/*! + * @brief this structure is used to give the diag thread the information it + * needs to start and run. + */ +struct diag_thread_args +{ + diag_thread_args( const std::string& hostname, + int port, + diag_queue_t& message_queue, + diag_queue_t& free_list_queue ) + : address( hostname, port ), queues( message_queue, free_list_queue ) + { + } + diag_thread_args( diag_queue_t& message_queue, + diag_queue_t& free_list_queue ) + : address( "127.0.0.1", 9000 ), queues( message_queue, free_list_queue ) + { + } + std::pair< std::string, int > address; + diag_thread_queues queues; +}; + +struct diag_client_conn +{ + diag_client_conn( io_context_t& io ) + : io_context( io ), socket( io ), data( ), buffer( ) + { + } + + io_context_t& io_context; + tcp::socket socket; + diag_info_block data; + + std::string buffer; +}; + +// forward +void diag_thread_mainloop( diag_thread_args* args ); +void update_diag_info( diag_thread_queues& queues ); + // End Header ************************************************************ // @@ -431,6 +568,7 @@ channel_parse_callback( char* channel_name, << static_cast< int >( _32bit_integer ) << ")\n"; exit( 1 ); } + edc->channel_type[ edc->num_chans ] = daq_data_type; strncpy( edc->channel_name[ edc->num_chans ], channel_name, sizeof( edc->channel_name[ edc->num_chans ] ) ); @@ -688,12 +826,13 @@ usage( const char* prog ) std::cout << "Usage:\n\t" << prog << " <options>\n\n"; std::cout << "-b <mbuf name> - The name of the mbuf to write to [edc_daq]\n"; - std::cout << "-d <dcu id> - The dcu id number to use [52]\n"; std::cout << "-i <ini file name> - The ini file to read [edc.ini]\n"; std::cout << "-w <wait time in ms> - Number of ms to wait after each 16Hz " "segment has starts [0]\n"; std::cout << "-p <prefix> - Prefix to add to the connection stats channel " "names\n"; + std::cout << "-l <interface:port> - Where to bind to for the diagnostic " + "http output\n"; std::cout << "-h - this help\n"; std::cout << "\nThe standalone edcu is used to record epics data and put " "it into a memory buffer which can "; @@ -712,6 +851,20 @@ usage( const char* prog ) std::cout << "\n"; } +std::pair< std::string, int > +parse_address( const std::string& str ) +{ + std::string hostname = str; + int port = 20222; + auto pos = str.find( ':' ); + if ( pos != std::string::npos ) + { + hostname = str.substr( 0, pos ); + port = std::stoi( str.substr( pos + 1 ) ); + } + return std::make_pair( std::move( hostname ), port ); +} + /// Called on EPICS startup; This is generic EPICS provided function, modified /// for LIGO use. int @@ -721,6 +874,14 @@ main( int argc, char* argv[] ) // Initialize request for file load on startup. int send_daq_reset = 0; + diag_queue_t diag_msg_queue; + diag_queue_t diag_free_queue; + diag_info_block diag_info[ DIAG_QUEUE_DEPTH ]; + for ( int i = 0; i < DIAG_QUEUE_DEPTH; ++i ) + { + diag_free_queue.push( &( diag_info[ i ] ) ); + } + const char* daqsharedmemname = "edc_daq"; // const char* syncsharedmemname = "-"; const char* daqFile = "edc.ini"; @@ -728,8 +889,10 @@ main( int argc, char* argv[] ) int delay_multiplier = 0; + diag_thread_args diag_args( diag_msg_queue, diag_free_queue ); + int cur_arg = 0; - while ( ( cur_arg = getopt( argc, argv, "b:i:w:p:h" ) ) != EOF ) + while ( ( cur_arg = getopt( argc, argv, "b:i:w:p:l:h" ) ) != EOF ) { switch ( cur_arg ) { @@ -745,6 +908,9 @@ main( int argc, char* argv[] ) case 'p': prefix = optarg; break; + case 'l': + diag_args.address = parse_address( optarg ); + break; case 'h': default: usage( argv[ 0 ] ); @@ -798,6 +964,9 @@ main( int argc, char* argv[] ) int cyle = 0; + update_diag_info( diag_args.queues ); + std::thread diag_thread( diag_thread_mainloop, &diag_args ); + // Start Infinite Loop // ******************************************************************************* for ( ;; ) @@ -822,7 +991,196 @@ main( int argc, char* argv[] ) cycle = ( cycle + 1 ) % 16; transmit_time = transmit_time + time_step; + + if ( cycle == 0 ) + { + update_diag_info( diag_args.queues ); + } } return ( 0 ); } + +/*! + * @brief Route to be called from the main thread to send a diag blog to the + * diag thread. This holds a snapshot of the status of the channels and + * connections. + * @param queues The set of queues used to communicate with between the threads + */ +void +update_diag_info( diag_thread_queues& queues ) +{ + if ( !queues.free_queue.read_available( ) ) + { + return; + } + diag_info_block* info = nullptr; + queues.free_queue.pop( &info ); + + info->con_chans = daqd_edcu1.con_chans; + std::copy( std::begin( daqd_edcu1.channel_status ), + std::begin( daqd_edcu1.channel_status ) + daqd_edcu1.num_chans, + std::begin( info->status ) ); + + queues.msg_queue.push( info ); +} + +/*! + * @brief handler called from the diag thread periodically to pull diag + * information out of the shared diag message queue. + * @param queues The queues used to communicate with the main thread + * @param dest The data block to copy the diag information to. + */ +void +get_diag_info( diag_thread_queues& queues, diag_info_block& dest ) +{ + if ( !queues.msg_queue.read_available( ) ) + { + return; + } + diag_info_block* info = nullptr; + queues.msg_queue.pop( &info ); + + dest = *info; + + queues.free_queue.push( info ); +} + +/*! + * @brief timer callback for get_diag_info, this just ensure the get_diag_info + * is called and resets the time + * @param timer the timer + * @param queues The message queues + * @param dest where to write the diag info + */ +void +get_diag_info_handler( boost::asio::deadline_timer& timer, + diag_thread_queues& queues, + diag_info_block& dest ) +{ + get_diag_info( queues, dest ); + timer.expires_from_now( boost::posix_time::seconds( 1 ) ); + timer.async_wait( + [&timer, &queues, &dest]( const boost::system::error_code& ) { + get_diag_info_handler( timer, queues, dest ); + } ); +} + +void accept_loop( io_context_t& io_context, + tcp::acceptor& acceptor, + const diag_info_block& cur_diag ); + +void +client_write( std::shared_ptr< diag_client_conn > conn ) +{ + rapidjson::StringBuffer s; + rapidjson::Writer< rapidjson::StringBuffer > writer( s ); + + writer.StartObject( ); + writer.Key( "TotalChannels" ); + writer.Int( daqd_edcu1.num_chans ); + writer.Key( "ConnectedChannelCount" ); + writer.Int( conn->data.con_chans ); + writer.Key( "DisconnectedChannelCount" ); + writer.Int( daqd_edcu1.num_chans - conn->data.con_chans ); + writer.Key( "DisconnectedChannels" ); + writer.StartArray( ); + for ( int i = 0; i < daqd_edcu1.num_chans; ++i ) + { + if ( conn->data.status[ i ] != 0 ) + { + writer.String( daqd_edcu1.channel_name[ i ] ); + } + } + writer.EndArray( ); + writer.EndObject( ); + + std::ostringstream os; + os << std::hex << s.GetSize( ) << "\r\n"; + conn->buffer = os.str( ) + std::string( s.GetString( ), s.GetSize( ) ) + + std::string( "\r\n0\r\n\r\n" ); + + boost::asio::async_write( + conn->socket, + boost::asio::buffer( conn->buffer ), + [conn]( const boost::system::error_code& ec, std::size_t ) { + if ( !ec ) + { + } + } ); +} + +void +start_client_write( std::shared_ptr< diag_client_conn > conn ) +{ + static std::string header{ "HTTP/1.1 200 Ok\r\n" + "Content-Type: application/json\r\n" + "Cache-Control: max-age=1\r\n" + "Transfer-Encoding: chunked\r\n" + "Connection: close\r\n\r\n" }; + + boost::asio::async_write( + conn->socket, + boost::asio::buffer( header ), + [conn]( const boost::system::error_code& ec, std::size_t ) { + if ( !ec ) + { + client_write( conn ); + } + } ); +} + +void +handle_accept( io_context_t& io_context, + tcp::acceptor& acceptor, + std::shared_ptr< diag_client_conn > conn, + const boost::system::error_code& ec, + const diag_info_block& cur_diag ) +{ + if ( !ec ) + { + conn->data = cur_diag; + } + std::cerr << "Accepting a new connection\n"; + conn->io_context.post( [conn]( ) { start_client_write( conn ); } ); + accept_loop( io_context, acceptor, cur_diag ); +} + +void +accept_loop( io_context_t& io_context, + tcp::acceptor& acceptor, + const diag_info_block& cur_diag ) +{ + std::shared_ptr< diag_client_conn > conn = + std::make_shared< diag_client_conn >( io_context ); + acceptor.async_accept( conn->socket, + [&io_context, &acceptor, conn, &cur_diag]( + const boost::system::error_code& ec ) { + handle_accept( + io_context, acceptor, conn, ec, cur_diag ); + } ); +} + +/*! + * @brief the diag thread main loop + * @param args Arguments to the diag thread + */ +void +diag_thread_mainloop( diag_thread_args* args ) +{ + diag_info_block cur_diag; + + io_context_t io_context; + io_context_t::work net_work( io_context ); + + boost::asio::deadline_timer diag_timer( io_context ); + get_diag_info_handler( diag_timer, args->queues, cur_diag ); + + tcp::acceptor acceptor( + io_context, + tcp::endpoint( make_address( args->address.first.c_str( ) ), + args->address.second ) ); + + accept_loop( io_context, acceptor, cur_diag ); + io_context.run( ); +} -- GitLab