Skip to content
Snippets Groups Projects
Commit b56270a0 authored by Erik von Reis's avatar Erik von Reis
Browse files

Merge branch 'pub_sub_recv_fixes' into 'master'

Fix for #119.  Properly set the late state of the receive buffer in the pub_sub receivers.

Closes #119

See merge request cds/advligorts!110
parents 28db6c45 b1ee6f83
No related branches found
No related tags found
1 merge request!110Fix for #119. Properly set the late state of the receive buffer in the pub_sub receivers.
...@@ -81,6 +81,13 @@ if (libcds-pubsub_FOUND) ...@@ -81,6 +81,13 @@ if (libcds-pubsub_FOUND)
# sub_plugin_rmipc) # sub_plugin_rmipc)
# target_requires_cpp11(cds_pub_sub_asan PUBLIC) # target_requires_cpp11(cds_pub_sub_asan PUBLIC)
add_executable(test_buffer_entry tests/test_buffer_entry.cc)
target_include_directories(test_buffer_entry PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../include)
target_requires_cpp11(test_buffer_entry PUBLIC)
configure_file(test_pub_sub_xmit_recv.sh.in test_pub_sub_xmit_recv.sh @ONLY) configure_file(test_pub_sub_xmit_recv.sh.in test_pub_sub_xmit_recv.sh @ONLY)
configure_file(test_pub_sub.sh.in test_pub_sub.sh @ONLY) configure_file(test_pub_sub.sh.in test_pub_sub.sh @ONLY)
configure_file(test_pub_sub_rmipc_to_daqm.sh.in test_pub_sub_rmipc_to_daqm.sh @ONLY) configure_file(test_pub_sub_rmipc_to_daqm.sh.in test_pub_sub_rmipc_to_daqm.sh @ONLY)
...@@ -102,6 +109,10 @@ if (libcds-pubsub_FOUND) ...@@ -102,6 +109,10 @@ if (libcds-pubsub_FOUND)
COMMAND /bin/bash ./test_pub_sub_daqm_to_daqm.sh COMMAND /bin/bash ./test_pub_sub_daqm_to_daqm.sh
WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
add_test(NAME "test_buffer_entry"
COMMAND test_buffer_entry)
install(TARGETS cps_xmit DESTINATION bin) install(TARGETS cps_xmit DESTINATION bin)
install(TARGETS cps_recv DESTINATION bin) install(TARGETS cps_recv DESTINATION bin)
install(TARGETS cds_pub_sub DESTINATION bin) install(TARGETS cds_pub_sub DESTINATION bin)
......
...@@ -323,6 +323,8 @@ struct buffer_entry ...@@ -323,6 +323,8 @@ struct buffer_entry
return; return;
} }
cb( ifo_data ); cb( ifo_data );
// anything received after this point is late.
is_late_ = true;
} }
void void
...@@ -346,6 +348,7 @@ private: ...@@ -346,6 +348,7 @@ private:
ifo_data.header.dcuTotalModels = 0; ifo_data.header.dcuTotalModels = 0;
ifo_data.header.fullDataBlockSize = 0; ifo_data.header.fullDataBlockSize = 0;
data = &ifo_data.dataBlock[ 0 ]; data = &ifo_data.dataBlock[ 0 ];
is_late_ = false;
} }
}; };
......
//
// Created by jonathan.hanks on 5/27/20.
//
#include "recv_buffer.hh"
#include <algorithm>
#include <iostream>
#include <memory>
std::shared_ptr< daq_multi_dcu_data_t >
dummy_data( unsigned int dcu_id, unsigned int gps, int cycle )
{
auto data = std::make_shared< daq_multi_dcu_data_t >( );
data->header.dcuTotalModels = 1;
daq_msg_header_t& header = data->header.dcuheader[ 0 ];
header.dcuId = dcu_id;
header.dataBlockSize = 1000;
header.tpBlockSize = 0;
header.timeSec = gps;
header.timeNSec = ( 1000000000 / 16 ) * cycle;
header.cycle = cycle;
header.tpCount = 0;
std::fill( std::begin( header.tpNum ), std::end( header.tpNum ), 0 );
header.status = 0;
auto start = &( data->dataBlock[ 0 ] );
std::fill( start, start + 1000, 0xfe );
data->header.fullDataBlockSize = 1000;
return data;
}
void
test_buffer_entry_late_entry( )
{
const int gps_time = 1000000000;
std::atomic< int64_t > late_entries{ 0 };
std::atomic< int64_t > discarded_entries{ 0 };
std::atomic< int64_t > total_span{ 0 };
auto buffer = std::make_shared< buffer_entry >(
&late_entries, &discarded_entries, &total_span );
auto data = dummy_data( 5, gps_time, 0 );
buffer->ingest( *data );
if ( late_entries != 0 || discarded_entries != 0 )
{
throw std::runtime_error( "unexpected late or discarded entry" );
}
data = dummy_data( 6, gps_time, 0 );
buffer->ingest( *data );
if ( late_entries != 0 || discarded_entries != 0 )
{
throw std::runtime_error( "unexpected late or discarded entry" );
}
bool cb_called{ false };
auto cb = [&cb_called]( const daq_dc_data_t& dc ) { cb_called = true; };
buffer->process_if( gps_key( gps_time, 0 ), cb );
if ( !cb_called )
{
throw std::runtime_error( "Processing callback not called" );
}
data = dummy_data( 7, gps_time, 0 );
buffer->ingest( *data );
if ( late_entries != 1 )
{
throw std::runtime_error( "Expected late entry not marked as late" );
}
if ( discarded_entries != 0 )
{
throw std::runtime_error( "Unexpected discard entry" );
}
late_entries.store( 0 );
data = dummy_data( 7, gps_time - 1, 0 );
buffer->ingest( *data );
if ( discarded_entries != 1 )
{
throw std::runtime_error(
"Expected discard entry not marked as discarded" );
}
if ( late_entries != 0 )
{
throw std::runtime_error( "Unexpected entry marked late" );
}
}
int
main( int argc, char* argv[] )
{
test_buffer_entry_late_entry( );
return 0;
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment