Commit fce42784 authored by Erik von Reis's avatar Erik von Reis

Merge branch 'cps_recv_bound_recieve_window' into 'master'

cps_recv: Work on issue #207 dealing with wildly bad input time values.

See merge request cds/advligorts!202
parents 933727a4 eda6a2a3
//
// Created by jonathan.hanks on 1/13/21.
//
#ifndef DAQD_TRUNK_MESSAGE_FILTER_HH
#define DAQD_TRUNK_MESSAGE_FILTER_HH
#include <atomic>
#include <chrono>
#include <fstream>
#include <thread>
#include <cds-pubsub/sub.hh>
/*!
* @brief A simple filter to reject messages that have keys (times) wildly
* different than current gps time.
*/
class GpsMessageFilter
{
public:
/*!
* @brief initialize the message filter
* @param max_skew the max skew +-seconds allowed
*/
explicit GpsMessageFilter( int max_skew )
: max_skew_{ static_cast< int64_t >( max_skew ) }
{
}
GpsMessageFilter( const GpsMessageFilter& ) = delete;
GpsMessageFilter( GpsMessageFilter&& ) = delete;
GpsMessageFilter& operator=( const GpsMessageFilter& ) = delete;
GpsMessageFilter& operator=( GpsMessageFilter&& ) = delete;
/*!
* @brief Tell the run_updater routine it can exit.
*/
void
stop( ) noexcept
{
stopping_ = true;
}
/*!
* @brief A main loop that can be used by a thread to keep the current gps
* time updated.
*/
void
run_updater( )
{
while ( !stopping_ )
{
update_gps( );
std::this_thread::sleep_for( std::chrono::seconds( 25 ) );
}
}
/*!
* @brief update the internal gps time value.
* @note This attempts to read /proc/gps and if that fails falls back to
* system time + an offset
*/
void
update_gps( )
{
std::int64_t cur_time{ 0 };
std::fstream gps_file( "/proc/gps" );
if ( gps_file )
{
gps_file >> cur_time;
if ( gps_file.good( ) )
{
cur_gps_ = cur_time;
return;
}
}
auto seconds_since_epoch =
std::chrono::duration_cast< std::chrono::seconds >(
std::chrono::system_clock::now( ).time_since_epoch( ) );
// yes this is hard coded, our typical window is big enough that
// not being up to date on leap seconds is not a big issue.
cur_time = static_cast< std::int64_t >( seconds_since_epoch.count( ) ) -
315964782;
cur_gps_ = cur_time;
return;
}
/*!
* @brief Get the gps time as understood by the filter
* @return the gps time in seconds
*/
std::int64_t
gps( ) const noexcept
{
return cur_gps_;
}
/*!
* @brief the filter function
* @param msg the message to check the time (key) for
* @return false if it is too far away from gps time else true
*/
bool
operator( )( const pub_sub::SubMessage& msg ) const
{
auto gps = cur_gps_.load( );
if ( gps == 0 )
{
return true;
}
auto msg_time = extract_gps( msg );
if ( msg_time < 0 )
{
return false;
}
auto delta = msg_time - gps;
delta = ( delta < 0 ? -delta : delta );
return ( delta <= max_skew_ );
}
private:
/*!
* @brief extract the gps time from the msg
* @return The gps time or -1 if it could not be extracted
*/
static std::int64_t
extract_gps( const pub_sub::SubMessage& msg ) noexcept
{
if ( msg.size( ) <= sizeof( daq_multi_dcu_header_t ) )
{
return -1;
}
auto headers =
reinterpret_cast< daq_multi_dcu_header_t* >( msg.data( ) );
if ( headers->dcuTotalModels == 0 )
{
return -1;
}
return static_cast< std::int64_t >( headers->dcuheader[ 0 ].timeSec );
}
std::int64_t max_skew_{ 0 };
std::atomic< std::int64_t > cur_gps_{ 0 };
std::atomic< bool > stopping_{ false };
};
#endif // DAQD_TRUNK_MESSAGE_FILTER_HH
......@@ -44,6 +44,8 @@
#include "make_unique.hh"
#include "dc_stats.hh"
#include "message_filter.hh"
#define __CDECL
#define MIN_DELAY_MS 5
......@@ -426,6 +428,7 @@ main( int argc, char** argv )
const char* admin_interface = nullptr;
const char* channel_list_file = nullptr;
std::vector< std::string > subscription_strings{};
int max_message_skew = 0;
int thread_per_sub = 0;
int dcus_received = 0;
int max_dcus_received = 0;
......@@ -544,6 +547,14 @@ main( int argc, char** argv )
"A primary channel list file",
&channel_list_file,
"" );
args_add_int( arg_parser,
ARGS_NO_SHORT,
"time-delta",
"seconds",
"The largest time skew to allow between cur GPS and the "
"message time before rejecting a message",
&max_message_skew,
300 );
if ( args_parse( arg_parser, argc, argv ) < 0 )
{
......@@ -905,6 +916,9 @@ main( int argc, char** argv )
}
SimplePVCloser epics_server_teardown_( epics_server );
GpsMessageFilter msg_filter( max_message_skew );
std::thread gps_thread( [&msg_filter]( ) { msg_filter.run_updater( ); } );
// set up to catch Control C
signal( SIGINT, intHandler );
// setup to ignore sig pipe
......@@ -932,7 +946,11 @@ main( int argc, char** argv )
subscribers.front( )->SetupDebugHooks( &debug );
fprintf( stderr, "Beginning subscription on %s\n", conn_str.c_str( ) );
auto sub_id = subscribers.front( )->subscribe(
conn_str, []( pub_sub::SubMessage sub_msg ) {
conn_str, [&msg_filter]( pub_sub::SubMessage sub_msg ) {
if ( !msg_filter( sub_msg ) )
{
return;
}
circular_buffer.ingest(
*( (daq_multi_dcu_data_t*)sub_msg.data( ) ) );
} );
......@@ -1088,6 +1106,9 @@ main( int argc, char** argv )
} while ( keepRunning ); // End of infinite loop
msg_filter.stop( );
gps_thread.join( );
// We never exit unless we are killed, so always return an error
return 1;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment