Commit 8fd405ce authored by Jonathan Hanks's avatar Jonathan Hanks
WIP adding http diagnostics to the standalone_edc.

This allows disconnected channel lists to be extracted from the edc.
parent b49b55e9
......@@ -26,6 +26,8 @@ FIND_PACKAGE(OpenMX)
CHECK_CXX_SOURCE_COMPILES("#include <iostream>
#include <FlexLexer.h>
if (Boost_FOUND)
target_include_directories(standalone_edc PUBLIC
target_link_libraries(standalone_edc PUBLIC
target_requires_cpp11(standalone_edc PUBLIC)
install(TARGETS standalone_edc DESTINATION bin)
install(TARGETS standalone_edc DESTINATION bin)
\ No newline at end of file
message(WARNING "The standalone_edc will not be build as boost was not found")
endif (Boost_FOUND)
\ No newline at end of file
......@@ -14,6 +14,12 @@ of this distribution.
// - Make appropriate log file entries
// - Get rid of need to build
#include <algorithm>
#include <iterator>
#include <string>
#include <thread>
#include <utility>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
......@@ -28,6 +34,14 @@ of this distribution.
#include <daqmap.h>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
/* taken from channel.h in the daqd source
* find a way to do this better
......@@ -57,6 +71,38 @@ extern "C" {
#include <iostream>
#define EDCU_MAX_CHANS 50000
using boost::asio::ip::tcp;
using io_context_t = boost::asio::io_service;
inline io_context_t&
get_context( tcp::acceptor& acceptor )
return acceptor.get_io_service( );
inline boost::asio::ip::address
make_address( const char* str )
return boost::asio::ip::address::from_string( str );
using io_context_t = boost::asio::io_context;
inline io_context_t&
get_context( tcp::acceptor& acceptor )
return acceptor.get_executor( ).context( );
inline boost::asio::ip::address
make_address( const char* str )
return boost::asio::ip::make_address( str );
// Function prototypes
// ****************************************************************************************
......@@ -111,6 +157,97 @@ int timemarks[ 16 ] = { 1000 * 1000, 63500 * 1000, 126000 * 1000,
938500 * 1000 };
int nextTrig = 0;
* @brief A collection of information used for diagnostic output
struct diag_info_block
diag_info_block( ) : con_chans( 0 ), status( )
std::fill( std::begin( status ), std::end( status ), 0xbad );
diag_info_block( const diag_info_block& other )
: con_chans( other.con_chans ), status( )
std::copy( std::begin( other.status ),
std::end( other.status ),
std::begin( status ) );
operator=( const diag_info_block& other )
con_chans = other.con_chans;
std::copy( std::begin( other.status ),
std::end( other.status ),
std::begin( status ) );
return *this;
int con_chans;
int status[ EDCU_MAX_CHANS ];
typedef boost::lockfree::spsc_queue<
boost::lockfree::capacity< DIAG_QUEUE_DEPTH > >
* @brief this structure is used to refer to the message queues used between the
* main thread and the diag thread.
struct diag_thread_queues
diag_thread_queues( diag_queue_t& message_queue,
diag_queue_t& free_list_queue )
: msg_queue( message_queue ), free_queue( free_list_queue )
diag_queue_t& msg_queue;
diag_queue_t& free_queue;
* @brief this structure is used to give the diag thread the information it
* needs to start and run.
struct diag_thread_args
diag_thread_args( const std::string& hostname,
int port,
diag_queue_t& message_queue,
diag_queue_t& free_list_queue )
: address( hostname, port ), queues( message_queue, free_list_queue )
diag_thread_args( diag_queue_t& message_queue,
diag_queue_t& free_list_queue )
: address( "", 9000 ), queues( message_queue, free_list_queue )
std::pair< std::string, int > address;
diag_thread_queues queues;
struct diag_client_conn
diag_client_conn( io_context_t& io )
: io_context( io ), socket( io ), data( ), buffer( )
io_context_t& io_context;
tcp::socket socket;
diag_info_block data;
std::string buffer;
// forward
void diag_thread_mainloop( diag_thread_args* args );
void update_diag_info( diag_thread_queues& queues );
// End Header ************************************************************
......@@ -431,6 +568,7 @@ channel_parse_callback( char* channel_name,
<< static_cast< int >( _32bit_integer ) << ")\n";
exit( 1 );
edc->channel_type[ edc->num_chans ] = daq_data_type;
strncpy( edc->channel_name[ edc->num_chans ],
sizeof( edc->channel_name[ edc->num_chans ] ) );
......@@ -688,12 +826,13 @@ usage( const char* prog )
std::cout << "Usage:\n\t" << prog << " <options>\n\n";
<< "-b <mbuf name> - The name of the mbuf to write to [edc_daq]\n";
std::cout << "-d <dcu id> - The dcu id number to use [52]\n";
std::cout << "-i <ini file name> - The ini file to read [edc.ini]\n";
std::cout << "-w <wait time in ms> - Number of ms to wait after each 16Hz "
"segment has starts [0]\n";
std::cout << "-p <prefix> - Prefix to add to the connection stats channel "
std::cout << "-l <interface:port> - Where to bind to for the diagnostic "
"http output\n";
std::cout << "-h - this help\n";
std::cout << "\nThe standalone edcu is used to record epics data and put "
"it into a memory buffer which can ";
......@@ -712,6 +851,20 @@ usage( const char* prog )
std::cout << "\n";
std::pair< std::string, int >
parse_address( const std::string& str )
std::string hostname = str;
int port = 20222;
auto pos = str.find( ':' );
if ( pos != std::string::npos )
hostname = str.substr( 0, pos );
port = std::stoi( str.substr( pos + 1 ) );
return std::make_pair( std::move( hostname ), port );
/// Called on EPICS startup; This is generic EPICS provided function, modified
/// for LIGO use.
......@@ -721,6 +874,14 @@ main( int argc, char* argv[] )
// Initialize request for file load on startup.
int send_daq_reset = 0;
diag_queue_t diag_msg_queue;
diag_queue_t diag_free_queue;
diag_info_block diag_info[ DIAG_QUEUE_DEPTH ];
for ( int i = 0; i < DIAG_QUEUE_DEPTH; ++i )
diag_free_queue.push( &( diag_info[ i ] ) );
const char* daqsharedmemname = "edc_daq";
// const char* syncsharedmemname = "-";
const char* daqFile = "edc.ini";
......@@ -728,8 +889,10 @@ main( int argc, char* argv[] )
int delay_multiplier = 0;
diag_thread_args diag_args( diag_msg_queue, diag_free_queue );
int cur_arg = 0;
while ( ( cur_arg = getopt( argc, argv, "b:i:w:p:h" ) ) != EOF )
while ( ( cur_arg = getopt( argc, argv, "b:i:w:p:l:h" ) ) != EOF )
switch ( cur_arg )
......@@ -745,6 +908,9 @@ main( int argc, char* argv[] )
case 'p':
prefix = optarg;
case 'l':
diag_args.address = parse_address( optarg );
case 'h':
usage( argv[ 0 ] );
......@@ -798,6 +964,9 @@ main( int argc, char* argv[] )
int cyle = 0;
update_diag_info( diag_args.queues );
std::thread diag_thread( diag_thread_mainloop, &diag_args );
// Start Infinite Loop
// *******************************************************************************
for ( ;; )
......@@ -822,7 +991,196 @@ main( int argc, char* argv[] )
cycle = ( cycle + 1 ) % 16;
transmit_time = transmit_time + time_step;
if ( cycle == 0 )
update_diag_info( diag_args.queues );
return ( 0 );
* @brief Route to be called from the main thread to send a diag blog to the
* diag thread. This holds a snapshot of the status of the channels and
* connections.
* @param queues The set of queues used to communicate with between the threads
update_diag_info( diag_thread_queues& queues )
if ( !queues.free_queue.read_available( ) )
diag_info_block* info = nullptr;
queues.free_queue.pop( &info );
info->con_chans = daqd_edcu1.con_chans;
std::copy( std::begin( daqd_edcu1.channel_status ),
std::begin( daqd_edcu1.channel_status ) + daqd_edcu1.num_chans,
std::begin( info->status ) );
queues.msg_queue.push( info );
* @brief handler called from the diag thread periodically to pull diag
* information out of the shared diag message queue.
* @param queues The queues used to communicate with the main thread
* @param dest The data block to copy the diag information to.
get_diag_info( diag_thread_queues& queues, diag_info_block& dest )
if ( !queues.msg_queue.read_available( ) )
diag_info_block* info = nullptr;
queues.msg_queue.pop( &info );
dest = *info;
queues.free_queue.push( info );
* @brief timer callback for get_diag_info, this just ensure the get_diag_info
* is called and resets the time
* @param timer the timer
* @param queues The message queues
* @param dest where to write the diag info
get_diag_info_handler( boost::asio::deadline_timer& timer,
diag_thread_queues& queues,
diag_info_block& dest )
get_diag_info( queues, dest );
timer.expires_from_now( boost::posix_time::seconds( 1 ) );
[&timer, &queues, &dest]( const boost::system::error_code& ) {
get_diag_info_handler( timer, queues, dest );
} );
void accept_loop( io_context_t& io_context,
tcp::acceptor& acceptor,
const diag_info_block& cur_diag );
client_write( std::shared_ptr< diag_client_conn > conn )
rapidjson::StringBuffer s;
rapidjson::Writer< rapidjson::StringBuffer > writer( s );
writer.StartObject( );
writer.Key( "TotalChannels" );
writer.Int( daqd_edcu1.num_chans );
writer.Key( "ConnectedChannelCount" );
writer.Int( conn->data.con_chans );
writer.Key( "DisconnectedChannelCount" );
writer.Int( daqd_edcu1.num_chans - conn->data.con_chans );
writer.Key( "DisconnectedChannels" );
writer.StartArray( );
for ( int i = 0; i < daqd_edcu1.num_chans; ++i )
if ( conn->data.status[ i ] != 0 )
writer.String( daqd_edcu1.channel_name[ i ] );
writer.EndArray( );
writer.EndObject( );
std::ostringstream os;
os << std::hex << s.GetSize( ) << "\r\n";
conn->buffer = os.str( ) + std::string( s.GetString( ), s.GetSize( ) ) +
std::string( "\r\n0\r\n\r\n" );
boost::asio::buffer( conn->buffer ),
[conn]( const boost::system::error_code& ec, std::size_t ) {
if ( !ec )
} );
start_client_write( std::shared_ptr< diag_client_conn > conn )
static std::string header{ "HTTP/1.1 200 Ok\r\n"
"Content-Type: application/json\r\n"
"Cache-Control: max-age=1\r\n"
"Transfer-Encoding: chunked\r\n"
"Connection: close\r\n\r\n" };
boost::asio::buffer( header ),
[conn]( const boost::system::error_code& ec, std::size_t ) {
if ( !ec )
client_write( conn );
} );
handle_accept( io_context_t& io_context,
tcp::acceptor& acceptor,
std::shared_ptr< diag_client_conn > conn,
const boost::system::error_code& ec,
const diag_info_block& cur_diag )
if ( !ec )
conn->data = cur_diag;
std::cerr << "Accepting a new connection\n";
conn-> [conn]( ) { start_client_write( conn ); } );
accept_loop( io_context, acceptor, cur_diag );
accept_loop( io_context_t& io_context,
tcp::acceptor& acceptor,
const diag_info_block& cur_diag )
std::shared_ptr< diag_client_conn > conn =
std::make_shared< diag_client_conn >( io_context );
acceptor.async_accept( conn->socket,
[&io_context, &acceptor, conn, &cur_diag](
const boost::system::error_code& ec ) {
io_context, acceptor, conn, ec, cur_diag );
} );
* @brief the diag thread main loop
* @param args Arguments to the diag thread
diag_thread_mainloop( diag_thread_args* args )
diag_info_block cur_diag;
io_context_t io_context;
io_context_t::work net_work( io_context );
boost::asio::deadline_timer diag_timer( io_context );
get_diag_info_handler( diag_timer, args->queues, cur_diag );
tcp::acceptor acceptor(
tcp::endpoint( make_address( args->address.first.c_str( ) ),
args->address.second ) );
accept_loop( io_context, acceptor, cur_diag ); );
