diff --git a/src/pub_sub_stream/CMakeLists.txt b/src/pub_sub_stream/CMakeLists.txt index de8be78dc9302fd19f21cbb46a0c8e702466b5f7..2049cec31e43142b3b7581c3df6f6f3a00414784 100644 --- a/src/pub_sub_stream/CMakeLists.txt +++ b/src/pub_sub_stream/CMakeLists.txt @@ -81,6 +81,13 @@ if (libcds-pubsub_FOUND) # sub_plugin_rmipc) # target_requires_cpp11(cds_pub_sub_asan PUBLIC) + add_executable(test_buffer_entry tests/test_buffer_entry.cc) + target_include_directories(test_buffer_entry PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/../include) + target_requires_cpp11(test_buffer_entry PUBLIC) + configure_file(test_pub_sub_xmit_recv.sh.in test_pub_sub_xmit_recv.sh @ONLY) configure_file(test_pub_sub.sh.in test_pub_sub.sh @ONLY) configure_file(test_pub_sub_rmipc_to_daqm.sh.in test_pub_sub_rmipc_to_daqm.sh @ONLY) @@ -102,6 +109,10 @@ if (libcds-pubsub_FOUND) COMMAND /bin/bash ./test_pub_sub_daqm_to_daqm.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_buffer_entry" + COMMAND test_buffer_entry) + + install(TARGETS cps_xmit DESTINATION bin) install(TARGETS cps_recv DESTINATION bin) install(TARGETS cds_pub_sub DESTINATION bin) diff --git a/src/pub_sub_stream/recv_buffer.hh b/src/pub_sub_stream/recv_buffer.hh index 3c2bbb9b69f843ebedb6871f5b143e8de1cafb65..89cd54b7be11954e4638fbd3c18a4d42fcd87393 100644 --- a/src/pub_sub_stream/recv_buffer.hh +++ b/src/pub_sub_stream/recv_buffer.hh @@ -323,6 +323,8 @@ struct buffer_entry return; } cb( ifo_data ); + // anything received after this point is late. + is_late_ = true; } void @@ -346,6 +348,7 @@ private: ifo_data.header.dcuTotalModels = 0; ifo_data.header.fullDataBlockSize = 0; data = &ifo_data.dataBlock[ 0 ]; + is_late_ = false; } }; diff --git a/src/pub_sub_stream/tests/test_buffer_entry.cc b/src/pub_sub_stream/tests/test_buffer_entry.cc new file mode 100644 index 0000000000000000000000000000000000000000..ea68a1021f14c74ccbded571f9393c3676524169 --- /dev/null +++ b/src/pub_sub_stream/tests/test_buffer_entry.cc @@ -0,0 +1,97 @@ +// +// Created by jonathan.hanks on 5/27/20. +// + +#include "recv_buffer.hh" + +#include <algorithm> +#include <iostream> +#include <memory> + +std::shared_ptr< daq_multi_dcu_data_t > +dummy_data( unsigned int dcu_id, unsigned int gps, int cycle ) +{ + auto data = std::make_shared< daq_multi_dcu_data_t >( ); + data->header.dcuTotalModels = 1; + daq_msg_header_t& header = data->header.dcuheader[ 0 ]; + header.dcuId = dcu_id; + header.dataBlockSize = 1000; + header.tpBlockSize = 0; + header.timeSec = gps; + header.timeNSec = ( 1000000000 / 16 ) * cycle; + header.cycle = cycle; + header.tpCount = 0; + std::fill( std::begin( header.tpNum ), std::end( header.tpNum ), 0 ); + header.status = 0; + auto start = &( data->dataBlock[ 0 ] ); + std::fill( start, start + 1000, 0xfe ); + data->header.fullDataBlockSize = 1000; + return data; +} + +void +test_buffer_entry_late_entry( ) +{ + const int gps_time = 1000000000; + + std::atomic< int64_t > late_entries{ 0 }; + std::atomic< int64_t > discarded_entries{ 0 }; + std::atomic< int64_t > total_span{ 0 }; + auto buffer = std::make_shared< buffer_entry >( + &late_entries, &discarded_entries, &total_span ); + + auto data = dummy_data( 5, gps_time, 0 ); + buffer->ingest( *data ); + + if ( late_entries != 0 || discarded_entries != 0 ) + { + throw std::runtime_error( "unexpected late or discarded entry" ); + } + + data = dummy_data( 6, gps_time, 0 ); + buffer->ingest( *data ); + + if ( late_entries != 0 || discarded_entries != 0 ) + { + throw std::runtime_error( "unexpected late or discarded entry" ); + } + + bool cb_called{ false }; + auto cb = [&cb_called]( const daq_dc_data_t& dc ) { cb_called = true; }; + buffer->process_if( gps_key( gps_time, 0 ), cb ); + if ( !cb_called ) + { + throw std::runtime_error( "Processing callback not called" ); + } + + data = dummy_data( 7, gps_time, 0 ); + buffer->ingest( *data ); + if ( late_entries != 1 ) + { + throw std::runtime_error( "Expected late entry not marked as late" ); + } + if ( discarded_entries != 0 ) + { + throw std::runtime_error( "Unexpected discard entry" ); + } + + late_entries.store( 0 ); + data = dummy_data( 7, gps_time - 1, 0 ); + buffer->ingest( *data ); + if ( discarded_entries != 1 ) + { + throw std::runtime_error( + "Expected discard entry not marked as discarded" ); + } + if ( late_entries != 0 ) + { + throw std::runtime_error( "Unexpected entry marked late" ); + } +} + +int +main( int argc, char* argv[] ) +{ + test_buffer_entry_late_entry( ); + return 0; +} \ No newline at end of file