Something went wrong on our end
-
Jonathan Hanks authored
Six channels are created by the EDCU. Only three were sent to both EPICS and the data stream. * Update the standalone_edcu to export the UPTIME, GPS, and DATA_RATE_KB_PER_S values in EPICS as well. * Minor formatting changes as per clang-format run. Closes #332
Jonathan Hanks authoredSix channels are created by the EDCU. Only three were sent to both EPICS and the data stream. * Update the standalone_edcu to export the UPTIME, GPS, and DATA_RATE_KB_PER_S values in EPICS as well. * Minor formatting changes as per clang-format run. Closes #332
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
standalone_edcu.cc 52.66 KiB
/// @file /src/epics/seq/edcu.c
/// @brief Contains required 'main' function to startup EPICS sequencers,
/// along with supporting routines.
///< This code is taken from EPICS example included in the EPICS
///< distribution and modified for LIGO use.
/********************COPYRIGHT NOTIFICATION**********************************
This software was developed under a United States Government license
described on the COPYRIGHT_UniversityOfChicago file included as part
of this distribution.
****************************************************************************/
// TODO:
// - Make appropriate log file entries
// - Get rid of need to build skeleton.st
#include <atomic>
#include <algorithm>
#include <array>
#include <iterator>
#include <numeric>
#include <limits>
#include <string>
#include <thread>
#include <utility>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <stdarg.h>
#include <unistd.h>
#include <time.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <daqmap.h>
#include <daq_data_types.h>
#define BOOST_ASIO_USE_BOOST_DATE_TIME_FOR_SOCKET_IOSTREAM
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
extern "C" {
#include "findSharedMemory.h"
#include "crc.h"
#include "param.h"
}
#include "cadef.h"
#include "fb.h"
#include "../../drv/gpstime/gpstime.h"
#include "gps.hh"
#include "args.h"
#include "simple_pv.h"
#include <iostream>
#define EDCU_MAX_CHANS 60000
#define DIAG_QUEUE_DEPTH 3
using boost::asio::ip::tcp;
#if BOOST_ASIO_VERSION < 101200
using io_context_t = boost::asio::io_service;
inline io_context_t&
get_context( tcp::acceptor& acceptor )
{
return acceptor.get_io_service( );
}
inline boost::asio::ip::address
make_address( const char* str )
{
return boost::asio::ip::address::from_string( str );
}
#else
using io_context_t = boost::asio::io_context;
inline boost::asio::ip::address
make_address( const char* str )
{
return boost::asio::ip::make_address( str );
}
#endif
const long MS_NS_MULTIPLIER = 1000000;
const long NS_IN_16_HZ = 1000000000 / 16;
long
ms_to_ns( long ms )
{
return ms * MS_NS_MULTIPLIER;
}
long
ns_to_ms( long ns )
{
return ns / MS_NS_MULTIPLIER;
}
/*!
* @brief manage the lifetime of a simple_pv server
*/
class PvServerManager
{
public:
PvServerManager( const std::string& prefix, std::vector< SimplePV >& pvs )
: server_{ ( prefix.empty( )
? nullptr
: simple_pv_server_create(
prefix.c_str( ),
pvs.data( ),
static_cast< int >( pvs.size( ) ) ) ) }
{
}
~PvServerManager( )
{
simple_pv_server_destroy( &server_ );
}
simple_pv_handle
get( ) const
{
return server_;
}
private:
simple_pv_handle server_;
};
/*!
* @brief This class builds the status channel names from a prefix
* @note This is done to bring everything regarding the names of these
* channels to one place, so that we don't have off by one 'N' or other
* errors littered through the code.
*/
class InternalChanNames
{
public:
explicit InternalChanNames( const std::string& prefix )
: prefix_{ prefix }, chan_conn_{ create_string_vector(
prefix, chan_conn_suffix( ) ) },
chan_noconn_{ create_string_vector( prefix, chan_noconn_suffix( ) ) },
chan_cnt_{ create_string_vector( prefix, chan_cnt_suffix( ) ) },
uptime_{ create_string_vector( prefix, uptime_suffix( ) ) },
data_rate_{ create_string_vector( prefix, data_rate_suffix( ) ) },
gpstime_{ create_string_vector( prefix, gpstime_suffix( ) ) }
{
}
const std::string&
prefix( ) const
{
return prefix_;
}
const char*
chan_conn( ) const
{
return chan_conn_.data( );
}
const char*
chan_noconn( ) const
{
return chan_noconn_.data( );
}
const char*
chan_cnt( ) const
{
return chan_cnt_.data( );
}
const char*
uptime( ) const
{
return uptime_.data( );
}
const char*
data_rate( ) const
{
return data_rate_.data( );
}
const char*
gpstime( ) const
{
return gpstime_.data( );
}
static const char*
chan_conn_suffix( )
{
static const char* data = "CHAN_CONN";
return data;
}
static const char*
chan_noconn_suffix( )
{
static const char* data = "CHAN_NOCON";
return data;
}
static const char*
chan_cnt_suffix( )
{
static const char* data = "CHAN_CNT";
return data;
}
static const char*
uptime_suffix( )
{
static const char* data = "UPTIME_SECONDS";
return data;
}
static const char*
data_rate_suffix( )
{
static const char* data = "DATA_RATE_KB_PER_S";
return data;
}
static const char*
gpstime_suffix( )
{
static const char* data = "GPS";
return data;
}
private:
static std::vector< char >
create_string_vector( const std::string& prefix, const std::string& name )
{
std::vector< char > s( prefix.size( ) + name.size( ) + 1 );
auto it = std::copy( prefix.begin( ), prefix.end( ), s.begin( ) );
it = std::copy( name.begin( ), name.end( ), it );
*it = '\0';
return s;
}
std::string prefix_{ };
// use a vector instead of a std::string as
// c_str() returns something that should be
// temporary, these needs to always be available
// as const char* after they are intialized
std::vector< char > chan_conn_{ };
std::vector< char > chan_noconn_{ };
std::vector< char > chan_cnt_{ };
std::vector< char > uptime_{ };
std::vector< char > data_rate_{ };
std::vector< char > gpstime_{ };
};
// Function prototypes
// ****************************************************************************************
int checkFileCrc( const char* );
typedef union edc_data_t
{
int16_t data_int16;
int32_t data_int32;
float data_float32;
double data_float64;
} edc_data_t;
struct edc_timestamped_data_t
{
edc_timestamped_data_t( ) : data( ), timestamp( )
{
timestamp.secPastEpoch = 0;
timestamp.nsec = 0;
}
edc_data_t data;
epicsTimeStamp timestamp;
};
unsigned long daqFileCrc;
class daqd_c
{
public:
daqd_c( )
: num_chans( 0 ), con_chans( 0 ), val_events( 0 ), con_events( 0 ),
channel_type( ), channel_value( ), channel_name( ), channel_status( ),
gpsTime( 0 ), epicsSync( 0 ), prefix( nullptr ), dcuid( 0 ),
uptime( 0 ), data_rate( 0 )
{
std::fill(
std::begin( channel_type ), std::end( channel_type ), _undefined );
std::fill(
std::begin( channel_status ), std::end( channel_status ), 0xbad );
}
int num_chans;
int con_chans;
int val_events;
int con_events;
daq_data_t channel_type[ EDCU_MAX_CHANS ];
edc_timestamped_data_t channel_value[ EDCU_MAX_CHANS ];
char channel_name[ EDCU_MAX_CHANS ][ 64 ];
int channel_status[ EDCU_MAX_CHANS ];
long gpsTime;
long epicsSync;
const char* prefix;
int dcuid;
int data_rate;
int uptime;
};
int num_chans_index = -1;
int con_chans_index = -1;
int nocon_chans_index = -1;
int uptime_index = -1;
int data_rate_index = -1;
int gpstime_index = -1;
int internal_channel_count = 0;
daqd_c daqd_edcu1;
static struct rmIpcStr* dipc;
static struct rmIpcStr* sipc;
static char* shmDataPtr;
static struct cdsDaqNetGdsTpNum* shmTpTable;
static const int buf_size = DAQ_DCU_BLOCK_SIZE;
static const int header_size =
sizeof( struct rmIpcStr ) + sizeof( struct cdsDaqNetGdsTpNum );
static int symmetricom_fd = -1;
int timemarks[ 16 ] = { 1000 * 1000, 63500 * 1000, 126000 * 1000,
188500 * 1000, 251000 * 1000, 313500 * 1000,
376000 * 1000, 438500 * 1000, 501000 * 1000,
563500 * 1000, 626000 * 1000, 688500 * 1000,
751000 * 1000, 813500 * 1000, 876000 * 1000,
938500 * 1000 };
int nextTrig = 0;
/*!
* @brief EdcuClock the clock for the standalone edc
* @details This class abstracts the clock allowing multiple sources to be
* selected from. Currently this will read form a mbuf (ie the time output of a
* LIGO model) or from the gpstime device or from the system clock via gpsclock.
*/
class EdcuClock
{
public:
EdcuClock( const char* sync_source, int wait_ms )
{
if ( sync_source != nullptr && strcmp( sync_source, "-" ) != 0 )
{
auto sync_addr = (void*)findSharedMemory( (char*)sync_source );
sipc_ = (volatile struct rmIpcStr*)( (char*)sync_addr +
CDS_DAQ_NET_IPC_OFFSET );
}
else
{
gps_clock_ =
std::unique_ptr< GPS::gps_clock >( new GPS::gps_clock( 0 ) );
}
}
GPS::gps_time
now( )
{
if ( sipc_ )
{
cycle_ = reinterpret_cast< volatile std::atomic< unsigned int >* >(
&( sipc_->cycle ) )
->load( );
return GPS::gps_time( sipc_->bp[ cycle_ ].timeSec,
cycle_ * ( NS_IN_16_HZ ) );
}
else
{
auto tmp = gps_clock_->now( );
cycle_ = tmp.nanosec / NS_IN_16_HZ;
return tmp;
}
}
void
clear_cycle( ) noexcept
{
cycle_ = 0;
}
int
cycle( ) const noexcept
{
return cycle_;
}
EdcuClock( EdcuClock&& other ) = default;
EdcuClock( const EdcuClock& ) = delete;
EdcuClock& operator=( const EdcuClock& ) = delete;
EdcuClock& operator=( EdcuClock&& other ) = default;
private:
int cycle_{ 0 };
std::unique_ptr< GPS::gps_clock > gps_clock_{ nullptr };
volatile struct rmIpcStr* sipc_{ nullptr };
};
/*!
* @brief A collection of information used for diagnostic output
*/
struct diag_info_block
{
diag_info_block( )
: con_chans( 0 ), nocon_chans( 0 ), uptime( 0 ), data_rate( 0 ),
gpstime( 0 ), status( )
{
std::fill( std::begin( status ), std::end( status ), 0xbad );
}
diag_info_block( const diag_info_block& other )
: con_chans( other.con_chans ), nocon_chans( other.nocon_chans ),
uptime( other.uptime ), data_rate( other.data_rate ),
gpstime( other.gpstime ), status( )
{
std::copy( std::begin( other.status ),
std::end( other.status ),
std::begin( status ) );
}
diag_info_block&
operator=( const diag_info_block& other )
{
con_chans = other.con_chans;
nocon_chans = other.nocon_chans;
uptime = other.uptime;
data_rate = other.data_rate;
gpstime = other.gpstime;
std::copy( std::begin( other.status ),
std::end( other.status ),
std::begin( status ) );
return *this;
}
int con_chans;
int nocon_chans;
int uptime;
int data_rate;
int gpstime;
int status[ EDCU_MAX_CHANS ];
};
typedef boost::lockfree::spsc_queue<
diag_info_block*,
boost::lockfree::capacity< DIAG_QUEUE_DEPTH > >
diag_queue_t;
/*!
* @brief this structure is used to refer to the message queues used between the
* main thread and the diag thread.
*/
struct diag_thread_queues
{
diag_thread_queues( diag_queue_t& message_queue,
diag_queue_t& free_list_queue )
: msg_queue( message_queue ), free_queue( free_list_queue )
{
}
diag_queue_t& msg_queue;
diag_queue_t& free_queue;
};
/*!
* @brief this structure is used to give the diag thread the information it
* needs to start and run.
*/
struct diag_thread_args
{
diag_thread_args( const std::string& hostname,
int port,
diag_queue_t& message_queue,
diag_queue_t& free_list_queue )
: address( hostname, port ), queues( message_queue, free_list_queue )
{
}
diag_thread_args( diag_queue_t& message_queue,
diag_queue_t& free_list_queue )
: address( "127.0.0.1", 9000 ), queues( message_queue, free_list_queue )
{
}
std::pair< std::string, int > address;
diag_thread_queues queues;
};
/*!
* @brief A Stream interface for rapid json to be paired with a writer. It
* doesn't write any data, instead calculating the size in bytes that would be
* needed to actually encode the data written to it.
*/
class SizeCalcJsonStream
{
public:
typedef char Ch;
SizeCalcJsonStream( ) : count_( 0 )
{
}
Ch
Peek( ) const
{
throw std::runtime_error( "Peek not implemented" );
}
Ch
Take( )
{
throw std::runtime_error( "Take not implemented" );
}
size_t
Tell( )
{
throw std::runtime_error( "Tell not implemented" );
}
Ch*
PutBegin( )
{
throw std::runtime_error( "PutBegin not implemented" );
};
void
Put( Ch c )
{
++count_;
}
void
Flush( )
{
}
size_t
PutEnd( Ch* begin )
{
throw std::runtime_error( "PutEnd not implemented" );
}
bool
SpaceAvailable( ) const
{
return true;
}
std::size_t
GetSize( ) const
{
return count_;
}
private:
std::size_t count_;
};
/*!
* @brief A Stream interface for rapid json to be paired with a writer. It
* writes into a fixed size buffer, and reports if there some space left.
*/
template < std::size_t MIN_SIZE >
class FixedSizeJsonStream
{
public:
typedef char Ch;
FixedSizeJsonStream( char* begin, char* end )
: begin_( begin ), cur_( begin ), end_( end )
{
}
Ch
Peek( ) const
{
throw std::runtime_error( "Peek not implemented" );
}
Ch
Take( )
{
throw std::runtime_error( "Take not implemented" );
}
size_t
Tell( )
{
throw std::runtime_error( "Tell not implemented" );
}
Ch*
PutBegin( )
{
throw std::runtime_error( "PutBegin not implemented" );
};
void
Put( Ch c )
{
if ( cur_ < end_ )
{
*cur_ = c;
++cur_;
}
else
{
throw std::out_of_range( "The FixedSizeBuffer has overflown" );
}
}
void
Flush( )
{
}
size_t
PutEnd( Ch* begin )
{
throw std::runtime_error( "PutEnd not implemented" );
}
void
AdvanceCounter( std::size_t count )
{
cur_ += count;
}
bool
SpaceAvailable( ) const
{
return ( end_ - cur_ ) > MIN_SIZE;
}
Ch*
GetString( ) const
{
return begin_;
}
std::size_t
GetSize( ) const
{
return cur_ - begin_;
}
void
Reset( )
{
cur_ = begin_;
}
private:
Ch* begin_;
Ch* cur_;
Ch* end_;
};
struct diag_client_conn
{
explicit diag_client_conn( io_context_t& io )
: io_context( io ), socket( io ), data( ), buffer( ),
buffer_stream( buffer.data( ), buffer.data( ) + buffer.size( ) ),
writer( buffer_stream ), list_progress( 0 )
{
}
io_context_t& io_context;
tcp::socket socket;
diag_info_block data;
std::array< char, 32000 > buffer;
FixedSizeJsonStream< 256 > buffer_stream;
rapidjson::Writer< FixedSizeJsonStream< 256 > > writer;
int list_progress;
};
// forward
void diag_thread_mainloop( diag_thread_args* args,
InternalChanNames* internal_names );
void update_diag_info( diag_thread_queues& queues );
// End Header ************************************************************
//
// **************************************************************************
void
connectCallback( struct connection_handler_args args )
{
// **************************************************************************
int* channel_status = (int*)ca_puser( args.chid );
int new_status = ( args.op == CA_OP_CONN_UP ? 0 : 0xbad );
/* In practice we have seen multiple disconnect events in a row w/o a
* connect event. So only update when there is a change. Otherwise this
* code cannot count well.
*/
if ( *channel_status != new_status )
{
*channel_status = new_status;
if ( args.op == CA_OP_CONN_UP )
{
daqd_edcu1.con_chans++;
}
else
{
daqd_edcu1.con_chans--;
}
}
daqd_edcu1.con_events++;
}
inline bool
operator>( const epicsTimeStamp t1, const epicsTimeStamp t2 )
{
if ( t1.secPastEpoch > t2.secPastEpoch )
{
return true;
}
if ( t1.secPastEpoch < t2.secPastEpoch )
{
return false;
}
return t1.nsec > t2.nsec;
}
// **************************************************************************
void
subscriptionHandler( struct event_handler_args args )
{
// **************************************************************************
daqd_edcu1.val_events++;
if ( args.status != ECA_NORMAL )
{
return;
}
switch ( args.type )
{
case DBR_TIME_SHORT: {
dbr_time_short* dbr = (dbr_time_short*)( args.dbr );
edc_timestamped_data_t* edc_data =
( (edc_timestamped_data_t*)( args.usr ) );
int i = edc_data - &( daqd_edcu1.channel_value[ 0 ] );
if ( dbr->stamp > edc_data->timestamp )
{
edc_data->data.data_int16 = dbr->value;
edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
edc_data->timestamp.nsec = dbr->stamp.nsec;
}
}
break;
case DBR_TIME_LONG: {
dbr_time_long* dbr = (dbr_time_long*)( args.dbr );
edc_timestamped_data_t* edc_data =
( (edc_timestamped_data_t*)( args.usr ) );
if ( dbr->stamp > edc_data->timestamp )
{
edc_data->data.data_int32 = dbr->value;
edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
edc_data->timestamp.nsec = dbr->stamp.nsec;
}
}
break;
case DBR_TIME_FLOAT: {
dbr_time_float* dbr = (dbr_time_float*)( args.dbr );
edc_timestamped_data_t* edc_data =
( (edc_timestamped_data_t*)( args.usr ) );
if ( dbr->stamp > edc_data->timestamp )
{
edc_data->data.data_float32 = dbr->value;
edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
edc_data->timestamp.nsec = dbr->stamp.nsec;
}
}
break;
case DBR_TIME_DOUBLE: {
dbr_time_double* dbr = (dbr_time_double*)( args.dbr );
edc_timestamped_data_t* edc_data =
( (edc_timestamped_data_t*)( args.usr ) );
if ( dbr->stamp > edc_data->timestamp )
{
edc_data->data.data_float64 = dbr->value;
edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
edc_data->timestamp.nsec = dbr->stamp.nsec;
}
}
break;
default:
printf( "Arg type unknown\n" );
break;
}
}
bool
valid_data_type( daq_data_t datatype )
{
switch ( datatype )
{
case _undefined:
case _32bit_complex:
case _32bit_uint:
case _64bit_integer:
default:
return false;
case _16bit_integer:
case _32bit_integer:
case _32bit_float:
case _64bit_double:
return true;
}
return true;
}
int
daq_data_t_to_epics( daq_data_t datatype )
{
switch ( datatype )
{
case _16bit_integer:
return DBR_TIME_SHORT;
case _32bit_integer:
return DBR_TIME_LONG;
case _32bit_float:
return DBR_TIME_FLOAT;
case _64bit_double:
return DBR_TIME_DOUBLE;
default:
throw std::runtime_error( "Unexpected data type given" );
}
}
std::size_t
accumulte_daq_sizes( std::size_t cur, daq_data_t data_type )
{
return cur + data_type_size( data_type );
}
std::size_t
calculate_data_size( const daqd_c& edc )
{
return std::accumulate( &( edc.channel_type[ 0 ] ),
&( edc.channel_type[ 0 ] ) + edc.num_chans,
0,
accumulte_daq_sizes );
}
bool
channel_is_edcu_special_chan( daqd_c* edc, const char* channel_name )
{
const char* dummy_prefix = "";
const char* prefix = ( edc->prefix ? edc->prefix : dummy_prefix );
size_t pref_len = strlen( prefix );
size_t name_len = strlen( channel_name );
if ( name_len <= pref_len )
{
return false;
}
if ( strncmp( prefix, channel_name, pref_len ) != 0 )
{
return false;
}
const char* remainder = channel_name + pref_len;
return ( strcmp( remainder, InternalChanNames::chan_conn_suffix( ) ) == 0 ||
strcmp( remainder, InternalChanNames::chan_noconn_suffix( ) ) ==
0 ||
strcmp( remainder, InternalChanNames::chan_cnt_suffix( ) ) == 0 ||
strcmp( remainder, InternalChanNames::uptime_suffix( ) ) == 0 ||
strcmp( remainder, InternalChanNames::data_rate_suffix( ) ) == 0 ||
strcmp( remainder, InternalChanNames::gpstime_suffix( ) ) == 0 );
}
int
channel_parse_callback( char* channel_name,
struct CHAN_PARAM* params,
void* user )
{
daqd_c* edc = reinterpret_cast< daqd_c* >( user );
if ( !edc || !channel_name || !params )
{
return 0;
}
if ( edc->num_chans >= EDCU_MAX_CHANS )
{
std::cerr << "Too many channels, aborting\n";
exit( 1 );
}
if ( strlen( channel_name ) >=
sizeof( edc->channel_name[ edc->num_chans ] ) )
{
std::cerr << "Channel name is too long '" << channel_name << "'\n";
exit( 1 );
}
if ( params->datarate != 16 )
{
std::cerr << "EDC channels may only be 16Hz\n";
exit( 1 );
}
if ( params->dcuid != edc->dcuid && edc->dcuid >= 0 )
{
std::cerr << "The edc can only have a single dcuid in its file\n";
exit( 1 );
}
if ( edc->dcuid < 0 )
{
edc->dcuid = params->dcuid;
}
daq_data_t daq_data_type = static_cast< daq_data_t >( params->datatype );
if ( !valid_data_type( daq_data_type ) )
{
std::cerr << "Invalid data type given for " << channel_name << "\n";
exit( 1 );
}
if ( channel_is_edcu_special_chan( edc, channel_name ) )
{
if ( daq_data_type != _32bit_integer && daq_data_type != _32bit_float )
{
std::cerr
<< "The edcu special variables (EDCU_CHAN_CONN/CNT/NOCON) "
"must be 32 bit ints ("
<< static_cast< int >( _32bit_integer ) << ") or 32 bit floats("
<< static_cast< int >( _32bit_float ) << "\n";
exit( 1 );
}
}
edc->channel_type[ edc->num_chans ] = daq_data_type;
strncpy( edc->channel_name[ edc->num_chans ],
channel_name,
sizeof( edc->channel_name[ edc->num_chans ] ) );
++( edc->num_chans );
return 1;
}
// **************************************************************************
void
edcuCreateChanList( daqd_c& daq,
const char* daqfilename,
unsigned long* crc,
const InternalChanNames& internal_chans )
{
// **************************************************************************
int i = 0;
int status = 0;
unsigned long dummy_crc = 0;
const char* chan_conn_name = internal_chans.chan_conn( );
const char* chan_cnt_name = internal_chans.chan_cnt( );
const char* chan_noconn_name = internal_chans.chan_noconn( );
const char* uptime_name = internal_chans.uptime( );
const char* data_rate_name = internal_chans.data_rate( );
const char* gpstime_name = internal_chans.gpstime( );
if ( !crc )
{
crc = &dummy_crc;
}
daq.num_chans = 0;
daq.dcuid = -1;
parseConfigFile( const_cast< char* >( daqfilename ),
crc,
channel_parse_callback,
-1,
(char*)0,
reinterpret_cast< void* >( &daq ) );
if ( daq.num_chans < 1 )
{
std::cerr << "No channels to record, aborting\n";
exit( 1 );
}
std::cout << "CRC data length = " << calculate_data_size( daqd_edcu1 )
<< "\n";
chid chid1;
if ( ca_context_create( ca_enable_preemptive_callback ) != ECA_NORMAL )
{
fprintf( stderr, "Error creating the EPCIS CA context\n" );
exit( 1 );
}
for ( i = 0; i < daq.num_chans; i++ )
{
if ( strcmp( daq.channel_name[ i ], chan_cnt_name ) == 0 )
{
num_chans_index = i;
internal_channel_count = internal_channel_count + 1;
daq.channel_status[ i ] = 0;
}
else if ( strcmp( daq.channel_name[ i ], chan_conn_name ) == 0 )
{
con_chans_index = i;
internal_channel_count = internal_channel_count + 1;
daq.channel_status[ i ] = 0;
}
else if ( strcmp( daq.channel_name[ i ], chan_noconn_name ) == 0 )
{
nocon_chans_index = i;
internal_channel_count = internal_channel_count + 1;
daq.channel_status[ i ] = 0;
}
else if ( strcmp( daq.channel_name[ i ], uptime_name ) == 0 )
{
uptime_index = i;
internal_channel_count = internal_channel_count + 1;
daq.channel_status[ i ] = 0;
}
else if ( strcmp( daq.channel_name[ i ], data_rate_name ) == 0 )
{
data_rate_index = i;
internal_channel_count = internal_channel_count + 1;
daq.channel_status[ i ] = 0;
}
else if ( strcmp( daq.channel_name[ i ], gpstime_name ) == 0 )
{
gpstime_index = i;
internal_channel_count = internal_channel_count + 1;
daq.channel_status[ i ] = 0;
}
else
{
daq.channel_status[ i ] = 0xbad;
status = ca_create_channel( daq.channel_name[ i ],
connectCallback,
(void*)&( daq.channel_status[ i ] ),
0,
&chid1 );
if ( status != ECA_NORMAL )
{
fprintf( stderr,
"Error creating connection to %s\n",
daq.channel_name[ i ] );
}
status = ca_create_subscription(
daq_data_t_to_epics( daq.channel_type[ i ] ),
0,
chid1,
DBE_VALUE,
subscriptionHandler,
(void*)&( daq.channel_value[ i ] ),
0 );
if ( status != ECA_NORMAL )
{
fprintf( stderr,
"Error creating subscription for %s\n",
daq.channel_name[ i ] );
}
}
}
daq.con_chans = daq.con_chans + internal_channel_count;
}
void
edcuLoadSpecial( int index, int value )
{
if ( index >= 0 )
{
switch ( daqd_edcu1.channel_type[ index ] )
{
case _32bit_integer:
daqd_edcu1.channel_value[ index ].data.data_int32 = value;
break;
case _32bit_float:
daqd_edcu1.channel_value[ index ].data.data_float32 =
static_cast< float >( value );
break;
}
}
}
/*!
* @brief copy the data from daqd_edcu1.channel_value to the output buffer
* @param daqData The output buffer
* @return The number of bytes transferred.
*/
int
copyDaqData( char* daqData )
{
char* dataStart = daqData;
int ii = 0;
for ( ii = 0; ii < daqd_edcu1.num_chans; ++ii )
{
switch ( daqd_edcu1.channel_type[ ii ] )
{
case _16bit_integer: {
*reinterpret_cast< int16_t* >( daqData ) =
daqd_edcu1.channel_value[ ii ].data.data_int16;
daqData += sizeof( int16_t );
break;
}
case _32bit_integer: {
*reinterpret_cast< int32_t* >( daqData ) =
daqd_edcu1.channel_value[ ii ].data.data_int32;
daqData += sizeof( int32_t );
break;
}
case _32bit_float: {
*reinterpret_cast< float* >( daqData ) =
daqd_edcu1.channel_value[ ii ].data.data_float32;
daqData += sizeof( float );
break;
}
case _64bit_double: {
*reinterpret_cast< double* >( daqData ) =
daqd_edcu1.channel_value[ ii ].data.data_float64;
daqData += sizeof( double );
break;
}
default:
std::cerr << "Unknown data type found, the edc does not know how "
"to layout the data, aborting\n";
exit( 1 );
}
}
return static_cast< int >( daqData - dataStart );
}
// **************************************************************************
void
edcuWriteData( int daqBlockNum, unsigned long cycle_gps_time, int dcuId )
// **************************************************************************
{
char* daqData;
int buf_size;
edcuLoadSpecial( num_chans_index, daqd_edcu1.num_chans );
edcuLoadSpecial( con_chans_index, daqd_edcu1.con_chans );
edcuLoadSpecial( nocon_chans_index,
daqd_edcu1.num_chans - daqd_edcu1.con_chans );
edcuLoadSpecial( uptime_index, daqd_edcu1.uptime );
edcuLoadSpecial( data_rate_index, daqd_edcu1.data_rate );
edcuLoadSpecial( gpstime_index, static_cast< int >( daqd_edcu1.gpsTime ) );
buf_size = DAQ_DCU_BLOCK_SIZE;
daqData = (char*)( shmDataPtr + ( buf_size * daqBlockNum ) );
unsigned int data_size = copyDaqData( daqData );
;
dipc->dcuId = dcuId;
dipc->crc = daqFileCrc;
dipc->dataBlockSize = data_size;
dipc->channelCount = daqd_edcu1.num_chans;
dipc->bp[ daqBlockNum ].cycle = daqBlockNum;
dipc->bp[ daqBlockNum ].crc = data_size;
dipc->bp[ daqBlockNum ].timeSec = (unsigned int)cycle_gps_time;
dipc->bp[ daqBlockNum ].timeNSec = (unsigned int)daqBlockNum;
shmTpTable->count = 0;
shmTpTable->tpNum[ 0 ] = 0;
dipc->cycle = daqBlockNum; // Triggers sending of data by mx_stream.
}
// **************************************************************************
EdcuClock
edcuInitialize( const std::string& mbuf_name,
const char* sync_source,
int delay_ms )
// **************************************************************************
{
void* sync_addr = nullptr;
sipc = nullptr;
const std::string daq( "_daq" );
std::vector< char > shmem_fname;
shmem_fname.reserve( mbuf_name.size( ) + daq.size( ) + 1 );
std::copy( mbuf_name.begin( ),
mbuf_name.end( ),
std::back_inserter( shmem_fname ) );
std::copy( daq.begin( ), daq.end( ), std::back_inserter( shmem_fname ) );
shmem_fname.emplace_back( '\0' );
// Find start of DAQ shared memory
void* dcu_addr = (void*)findSharedMemory( shmem_fname.data( ) );
// Find the IPC area to communicate with mxstream
dipc = (struct rmIpcStr*)( (char*)dcu_addr + CDS_DAQ_NET_IPC_OFFSET );
// Find the DAQ data area.
shmDataPtr = (char*)( (char*)dcu_addr + CDS_DAQ_NET_DATA_OFFSET );
shmTpTable = (struct cdsDaqNetGdsTpNum*)( (char*)dcu_addr +
CDS_DAQ_NET_GDS_TP_TABLE_OFFSET );
return EdcuClock( sync_source, delay_ms );
}
/// Common routine to check file CRC.
/// @param[in] *fName Name of file to check.
/// @return File CRC or -1 if file not found.
int
checkFileCrc( const char* fName )
{
char buffer[ 256 ];
FILE* pipePtr;
struct stat statBuf;
long chkSum = -99999;
strcpy( buffer, "cksum " );
strcat( buffer, fName );
if ( !stat( fName, &statBuf ) )
{
if ( ( pipePtr = popen( buffer, "r" ) ) != NULL )
{
fgets( buffer, 256, pipePtr );
pclose( pipePtr );
sscanf( buffer, "%ld", &chkSum );
}
return ( chkSum );
}
return ( -1 );
}
std::pair< std::string, int >
parse_address( const std::string& str )
{
std::string hostname = str;
int port = 20222;
auto pos = str.find( ':' );
if ( pos != std::string::npos )
{
hostname = str.substr( 0, pos );
port = std::stoi( str.substr( pos + 1 ) );
}
return std::make_pair( std::move( hostname ), port );
}
/// Called on EPICS startup; This is generic EPICS provided function, modified
/// for LIGO use.
int
main( int argc, char* argv[] )
{
// Addresses for SDF EPICS records.
// Initialize request for file load on startup.
args_handle arg_parser = nullptr;
diag_queue_t diag_msg_queue;
diag_queue_t diag_free_queue;
diag_info_block diag_info[ DIAG_QUEUE_DEPTH ];
for ( int i = 0; i < DIAG_QUEUE_DEPTH; ++i )
{
diag_free_queue.push( &( diag_info[ i ] ) );
}
const char* daqsharedmemname = nullptr;
const char* daqFile = nullptr;
const char* listen_interface = nullptr;
const char* sync_source = nullptr;
int delay_ms = 0;
memset( (void*)&daqd_edcu1, 0, sizeof( daqd_edcu1 ) );
diag_thread_args diag_args( diag_msg_queue, diag_free_queue );
{
std::ostringstream os;
os << "The standalone edc is used to record epics data and put it in a "
"memory buffer which can be consumed by the daqd tools.\n"
"Channels to record are listed in the input ini file. The "
"channels "
"may be:\n"
"\t* 16 bit ints (data type=1)\n"
"\t* 32 bit ints (data type=2)\n"
"\t* 32 bit floats (data type=4)\n"
"\t* 64 bit floats (data type=5)\n"
"The channels must all be set with a data rate of 16Hz.\n"
"Some special channels are produced by the standalone edc, to "
"record "
"connection status "
"(these channels may also be captured by the edc).\n"
"These channels are only available if the prefix is specified.\n"
"\t<prefix>"
<< InternalChanNames::chan_conn_suffix( ) << "\n\t<prefix>"
<< InternalChanNames::chan_noconn_suffix( )
<< "\n"
"\t<prefix>"
<< InternalChanNames::chan_cnt_suffix( ) << "\n"
<< "\t<prefix>" << InternalChanNames::uptime_suffix( ) << "\n"
<< "\t<prefix>" << InternalChanNames::data_rate_suffix( ) << "\n"
<< "\t<prefix>" << InternalChanNames::gpstime_suffix( ) << "\n"
<< "The standalone edc requires the LIGO mbuf and gpstime modules "
"to be loaded.\n";
auto preamble = os.str( );
arg_parser = args_create_parser( preamble.c_str( ) );
if ( !arg_parser )
{
return -1;
}
}
args_add_string_ptr( arg_parser,
'b',
ARGS_NO_LONG,
"buffer",
"The name of the mbuf to write to (note '_daq' will "
"be appended to the name).",
&daqsharedmemname,
"edc" );
args_add_string_ptr( arg_parser,
'i',
ARGS_NO_LONG,
"ini",
"The ini file to read",
&daqFile,
"edc.ini" );
args_add_int( arg_parser,
'w',
ARGS_NO_LONG,
"ms",
"Number of ms to wait after each 16Hz segment has started "
"before writing data "
"(may be negative to start early). This is ignored when "
"taking time from an iop.",
&delay_ms,
-15 );
args_add_string_ptr( arg_parser,
'p',
ARGS_NO_LONG,
"prefix",
"Prefix to add to the connection stats channel names",
&daqd_edcu1.prefix,
"" );
args_add_string_ptr( arg_parser,
'l',
"listen",
"interface:port",
"Where to bind to for the diagnostic http output",
&listen_interface,
"127.0.0.1:9000" );
args_add_string_ptr( arg_parser,
ARGS_NO_SHORT,
"sync-to",
"iop",
"Optional iop model to take timing from, if '-' then "
"use the gpstime driver",
&sync_source,
"-" );
if ( args_parse( arg_parser, argc, argv ) < 0 )
{
return -1;
}
if ( delay_ms < -50 || delay_ms > 50 )
{
std::cout << "Please keep the delay value between [-50, 50]"
<< std::endl;
return -1;
}
if ( delay_ms != 0 && strcmp( sync_source, "-" ) != 0 )
{
std::cout << "Delays are ignored when syncing from an iop" << std::endl;
delay_ms = 0;
}
InternalChanNames internal_channels( daqd_edcu1.prefix );
diag_args.address = parse_address( listen_interface );
// **********************************************
//
// EDCU STUFF
// ********************************************************************************************************
auto clock = edcuInitialize( daqsharedmemname, sync_source, delay_ms );
edcuCreateChanList( daqd_edcu1, daqFile, &daqFileCrc, internal_channels );
std::cout << "The edc dcuid = " << daqd_edcu1.dcuid << "\n";
{
std::vector< char > tmp_buffer( daqd_edcu1.num_chans *
sizeof( double ) );
daqd_edcu1.data_rate =
( copyDaqData( tmp_buffer.data( ) ) * 16 ) / 1024;
}
// Start SPECT
daqd_edcu1.gpsTime = clock.now( ).sec;
daqd_edcu1.epicsSync = 0;
// End SPECT
int dropout = 0;
int numDC = 0;
int numReport = 0;
printf( "DAQ file CRC = %u \n", daqFileCrc );
fprintf( stderr,
"%s\n%s = %u\n%s = %d",
"EDCU code restart",
"File CRC",
daqFileCrc,
"Chan Cnt",
daqd_edcu1.num_chans );
GPS::gps_time time_step = GPS::gps_time( 0, NS_IN_16_HZ );
GPS::gps_time transmit_time = clock.now( );
++transmit_time.sec;
transmit_time.nanosec = 0;
if ( delay_ms < 0 )
{
++transmit_time.sec;
transmit_time.nanosec = 1000000000 + ms_to_ns( delay_ms );
}
clock.clear_cycle( );
int sleep_per_cycle = ( delay_ms > 0 ? delay_ms : 0 );
update_diag_info( diag_args.queues );
std::thread diag_thread(
diag_thread_mainloop, &diag_args, &internal_channels );
// Start Infinite Loop
// *******************************************************************************
for ( ;; )
{
dropout = 0;
// daqd_edcu1.epicsSync = waitNewCycle( &daqd_edcu1.gpsTime );
GPS::gps_time now = clock.now( );
while ( now < transmit_time )
{
usleep( 500 );
now = clock.now( );
}
usleep( sleep_per_cycle * 1000 );
daqd_edcu1.gpsTime = now.sec;
daqd_edcu1.epicsSync = clock.cycle( );
edcuWriteData(
daqd_edcu1.epicsSync, daqd_edcu1.gpsTime, daqd_edcu1.dcuid );
transmit_time = transmit_time + time_step;
if ( clock.cycle( ) == 15 )
{
++daqd_edcu1.uptime;
update_diag_info( diag_args.queues );
}
}
return ( 0 );
}
/*!
* @brief Route to be called from the main thread to send a diag blog to the
* diag thread. This holds a snapshot of the status of the channels and
* connections.
* @param queues The set of queues used to communicate with between the threads
*/
void
update_diag_info( diag_thread_queues& queues )
{
if ( !queues.free_queue.read_available( ) )
{
return;
}
diag_info_block* info = nullptr;
queues.free_queue.pop( &info, 1 );
info->con_chans = daqd_edcu1.con_chans;
info->nocon_chans = daqd_edcu1.num_chans - info->con_chans;
info->uptime = daqd_edcu1.uptime;
info->data_rate = daqd_edcu1.data_rate;
info->gpstime = static_cast< int >( daqd_edcu1.gpsTime );
std::copy( std::begin( daqd_edcu1.channel_status ),
std::begin( daqd_edcu1.channel_status ) + daqd_edcu1.num_chans,
std::begin( info->status ) );
queues.msg_queue.push( info );
}
/*!
* @brief handler called from the diag thread periodically to pull diag
* information out of the shared diag message queue.
* @param queues The queues used to communicate with the main thread
* @param dest The data block to copy the diag information to.
*/
void
get_diag_info( diag_thread_queues& queues, diag_info_block& dest )
{
if ( !queues.msg_queue.read_available( ) )
{
return;
}
diag_info_block* info = nullptr;
queues.msg_queue.pop( &info, 1 );
dest = *info;
queues.free_queue.push( info );
}
/*!
* @brief timer callback for get_diag_info, this just ensure the get_diag_info
* is called and resets the time
* @param timer the timer
* @param queues The message queues
* @param dest where to write the diag info
*/
void
get_diag_info_handler( boost::asio::deadline_timer& timer,
diag_thread_queues& queues,
diag_info_block& dest )
{
get_diag_info( queues, dest );
timer.expires_from_now( boost::posix_time::seconds( 1 ) );
timer.async_wait(
[ &timer, &queues, &dest ]( const boost::system::error_code& ) {
get_diag_info_handler( timer, queues, dest );
} );
}
/*!
* @brief timer callback for the epics server, this ensures that
* simple_pv_server_update is called repeatedly
* @param timer The timer to use
* @param server the pv server handle
* @note this timer activates multiple times a second, not that we
* update that frequently, but to keep the epics network code responsive.
*/
void
epics_timer_handler( boost::asio::deadline_timer& timer,
simple_pv_handle server )
{
simple_pv_server_update( server );
timer.expires_from_now( boost::posix_time::milliseconds( 1000 / 8 ) );
timer.async_wait( [ &timer, server ]( const boost::system::error_code& ) {
epics_timer_handler( timer, server );
} );
}
void accept_loop( io_context_t& io_context,
tcp::acceptor& acceptor,
const diag_info_block& cur_diag );
/*!
* @brief start writing the json by doing the initial header
* @tparam Writer The writer
* @param writer the writer
* @param conn the connection diag info to dump
*/
template < typename Writer >
void
client_open_json( Writer& writer, diag_client_conn& conn )
{
writer.StartObject( );
writer.Key( "TotalChannels" );
writer.Int( daqd_edcu1.num_chans );
writer.Key( "ConnectedChannelCount" );
writer.Int( conn.data.con_chans );
writer.Key( "DisconnectedChannelCount" );
writer.Int( conn.data.nocon_chans );
writer.Key( "Uptime" );
writer.Int( conn.data.uptime );
writer.Key( "DataRateKbPerSec" );
writer.Int( conn.data.data_rate );
writer.Key( "Gpstime" );
writer.Int( conn.data.gpstime );
writer.Key( "DisconnectedChannels" );
writer.StartArray( );
}
/*!
* @brief iteratate over the channel list and output channel entries until
* the list is done, or the supplied buffer reports no more room.
* @tparam Writer The writer
* @tparam Buffer The buffer
* @param writer The writer
* @param buf The buffer
* @param conn The connection and diag info to iterate over
* @param cur The index to start on
* @return The next index to start at (daqd_edc1.num_chans when it is complete)
*/
template < typename Writer, typename Buffer >
int
client_fill_json( Writer& writer, Buffer& buf, diag_client_conn& conn, int cur )
{
for ( ; cur != daqd_edcu1.num_chans && buf.SpaceAvailable( ); ++cur )
{
if ( conn.data.status[ cur ] != 0 )
{
writer.String( daqd_edcu1.channel_name[ cur ] );
}
}
return cur;
}
/*!
* @brief Close out the client diagnositcs json message.
* @tparam Writer The writer
* @param writer the writer
*/
template < typename Writer >
void
client_close_json( Writer& writer )
{
writer.EndArray( );
writer.EndObject( );
}
/*!
* @brief calculate the size needed to hold a json message representing the
* diagnostics for the given client connection
* @param conn client connection and diagnostics object
* @return the number of bytes needed to hold the json message
*/
std::size_t
calc_required_size_for_json( diag_client_conn& conn )
{
SizeCalcJsonStream counting_buffer;
rapidjson::Writer< SizeCalcJsonStream > writer( counting_buffer );
client_open_json( writer, conn );
client_fill_json( writer, counting_buffer, conn, 0 );
client_close_json( writer );
return counting_buffer.GetSize( );
}
void
client_finalize( std::shared_ptr< diag_client_conn > conn )
{
conn->buffer_stream.Reset( );
client_close_json( conn->writer );
boost::asio::async_write(
conn->socket,
boost::asio::buffer( conn->buffer_stream.GetString( ),
conn->buffer_stream.GetSize( ) ),
[ conn ]( const boost::system::error_code& ec, std::size_t ) {} );
}
void
client_write( std::shared_ptr< diag_client_conn > conn )
{
if ( conn->list_progress == daqd_edcu1.num_chans )
{
client_finalize( std::move( conn ) );
return;
}
conn->buffer_stream.Reset( );
conn->list_progress = client_fill_json(
conn->writer, conn->buffer_stream, *conn, conn->list_progress );
if ( conn->buffer_stream.GetSize( ) == 0 )
{
client_finalize( std::move( conn ) );
return;
}
boost::asio::async_write(
conn->socket,
boost::asio::buffer( conn->buffer_stream.GetString( ),
conn->buffer_stream.GetSize( ) ),
[ conn ]( const boost::system::error_code& ec, std::size_t ) {
if ( !ec )
{
client_write( conn );
}
} );
}
void
start_client_write( std::shared_ptr< diag_client_conn > conn )
{
const static char* header{ "HTTP/1.0 200 Ok\r\n"
"Content-Type: application/json\r\n"
"Cache-Control: max-age=1\r\n"
"Content-Length: %d\r\n"
"Connection: close\r\n\r\n" };
std::size_t req_size = calc_required_size_for_json( *conn );
std::size_t header_size = static_cast< std::size_t >(
snprintf( conn->buffer.data( ),
conn->buffer.size( ),
header,
static_cast< int >( req_size ) ) );
if ( header_size >= conn->buffer.size( ) )
{
throw std::out_of_range( "Buffer overflowed while sending headers" );
}
conn->buffer_stream.Reset( );
conn->buffer_stream.AdvanceCounter( header_size );
conn->writer.Reset( conn->buffer_stream );
conn->list_progress = 0;
client_open_json( conn->writer, *conn );
conn->list_progress = client_fill_json(
conn->writer, conn->buffer_stream, *conn, conn->list_progress );
boost::asio::async_write(
conn->socket,
boost::asio::buffer( conn->buffer_stream.GetString( ),
conn->buffer_stream.GetSize( ) ),
[ conn ]( const boost::system::error_code& ec, std::size_t ) {
if ( !ec )
{
client_write( conn );
}
} );
}
void
handle_accept( io_context_t& io_context,
tcp::acceptor& acceptor,
std::shared_ptr< diag_client_conn > conn,
const boost::system::error_code& ec,
const diag_info_block& cur_diag )
{
if ( !ec )
{
conn->data = cur_diag;
}
// std::cerr << "Accepting a new connection\n";
// conn->io_context.post( [conn]( ) { start_client_write( conn ); } );
// We have seen some issues with web browsers thinking the connection is
// closed too early if we just write data out and close as soon as possible,
// so read some input (don't care what or how much) and then write the
// response.
conn->socket.async_read_some(
boost::asio::buffer( conn->buffer ),
[ conn ]( const boost::system::error_code& ec, std::size_t ) {
if ( !ec )
{
conn->io_context.post(
[ conn ]( ) { start_client_write( conn ); } );
}
} );
accept_loop( io_context, acceptor, cur_diag );
}
void
accept_loop( io_context_t& io_context,
tcp::acceptor& acceptor,
const diag_info_block& cur_diag )
{
std::shared_ptr< diag_client_conn > conn =
std::make_shared< diag_client_conn >( io_context );
acceptor.async_accept( conn->socket,
[ &io_context, &acceptor, conn, &cur_diag ](
const boost::system::error_code& ec ) {
handle_accept(
io_context, acceptor, conn, ec, cur_diag );
} );
}
/*!
* @brief the diag thread main loop
* @param args Arguments to the diag thread
*/
void
diag_thread_mainloop( diag_thread_args* args,
InternalChanNames* internal_names )
{
diag_info_block cur_diag;
std::vector< SimplePV > pvInfo{
SimplePV{
InternalChanNames::chan_conn_suffix( ),
SIMPLE_PV_INT,
&cur_diag.con_chans,
std::numeric_limits< int >::max( ),
static_cast< double >( daqd_edcu1.num_chans - 1 ),
std::numeric_limits< int >::max( ),
static_cast< double >( daqd_edcu1.num_chans - 1 ),
},
SimplePV{
InternalChanNames::chan_noconn_suffix( ),
SIMPLE_PV_INT,
&cur_diag.nocon_chans,
1,
-1,
1,
-1,
},
SimplePV{
InternalChanNames::chan_cnt_suffix( ),
SIMPLE_PV_INT,
&daqd_edcu1.num_chans,
static_cast< double >( daqd_edcu1.num_chans + 1 ),
static_cast< double >( daqd_edcu1.num_chans - 1 ),
static_cast< double >( daqd_edcu1.num_chans + 1 ),
static_cast< double >( daqd_edcu1.num_chans - 1 ),
},
SimplePV{
InternalChanNames::uptime_suffix( ),
SIMPLE_PV_INT,
&cur_diag.uptime,
std::numeric_limits< double >::max( ),
0.0,
std::numeric_limits< double >::max( ),
0.0,
},
SimplePV{
InternalChanNames::data_rate_suffix( ),
SIMPLE_PV_INT,
&cur_diag.data_rate,
4 * 1024,
0.0,
4 * 1024,
0.0,
},
SimplePV{
InternalChanNames::gpstime_suffix( ),
SIMPLE_PV_INT,
&cur_diag.gpstime,
std::numeric_limits< double >::max( ),
0.0,
std::numeric_limits< double >::max( ),
0.0,
},
};
PvServerManager epics_server( internal_names->prefix( ), pvInfo );
io_context_t io_context;
io_context_t::work net_work( io_context );
boost::asio::deadline_timer diag_timer( io_context );
get_diag_info_handler( diag_timer, args->queues, cur_diag );
boost::asio::deadline_timer epics_timer( io_context );
epics_timer_handler( epics_timer, epics_server.get( ) );
tcp::acceptor acceptor(
io_context,
tcp::endpoint( make_address( args->address.first.c_str( ) ),
args->address.second ) );
accept_loop( io_context, acceptor, cur_diag );
io_context.run( );
}