Skip to content
Snippets Groups Projects
Commit b1ee6f83 authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Fix for #119. Properly set the late state of the receive buffer in the pub_sub receivers.

The buffer enters into a late state after its data has been read out.  Any data for that cycle that arrives is considered late, until the next cycle comes in, at which point it is marked as dropped.

This updates the late state when data is copied out of the buffer and clears it when a new gps second/cycle is accepted in.

Closes #119.
parent 28db6c45
No related branches found
No related tags found
1 merge request!110Fix for #119. Properly set the late state of the receive buffer in the pub_sub receivers.
......@@ -81,6 +81,13 @@ if (libcds-pubsub_FOUND)
# sub_plugin_rmipc)
# target_requires_cpp11(cds_pub_sub_asan PUBLIC)
add_executable(test_buffer_entry tests/test_buffer_entry.cc)
target_include_directories(test_buffer_entry PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../include)
target_requires_cpp11(test_buffer_entry PUBLIC)
configure_file(test_pub_sub_xmit_recv.sh.in test_pub_sub_xmit_recv.sh @ONLY)
configure_file(test_pub_sub.sh.in test_pub_sub.sh @ONLY)
configure_file(test_pub_sub_rmipc_to_daqm.sh.in test_pub_sub_rmipc_to_daqm.sh @ONLY)
......@@ -102,6 +109,10 @@ if (libcds-pubsub_FOUND)
COMMAND /bin/bash ./test_pub_sub_daqm_to_daqm.sh
WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
add_test(NAME "test_buffer_entry"
COMMAND test_buffer_entry)
install(TARGETS cps_xmit DESTINATION bin)
install(TARGETS cps_recv DESTINATION bin)
install(TARGETS cds_pub_sub DESTINATION bin)
......
......@@ -323,6 +323,8 @@ struct buffer_entry
return;
}
cb( ifo_data );
// anything received after this point is late.
is_late_ = true;
}
void
......@@ -346,6 +348,7 @@ private:
ifo_data.header.dcuTotalModels = 0;
ifo_data.header.fullDataBlockSize = 0;
data = &ifo_data.dataBlock[ 0 ];
is_late_ = false;
}
};
......
//
// Created by jonathan.hanks on 5/27/20.
//
#include "recv_buffer.hh"
#include <algorithm>
#include <iostream>
#include <memory>
std::shared_ptr< daq_multi_dcu_data_t >
dummy_data( unsigned int dcu_id, unsigned int gps, int cycle )
{
auto data = std::make_shared< daq_multi_dcu_data_t >( );
data->header.dcuTotalModels = 1;
daq_msg_header_t& header = data->header.dcuheader[ 0 ];
header.dcuId = dcu_id;
header.dataBlockSize = 1000;
header.tpBlockSize = 0;
header.timeSec = gps;
header.timeNSec = ( 1000000000 / 16 ) * cycle;
header.cycle = cycle;
header.tpCount = 0;
std::fill( std::begin( header.tpNum ), std::end( header.tpNum ), 0 );
header.status = 0;
auto start = &( data->dataBlock[ 0 ] );
std::fill( start, start + 1000, 0xfe );
data->header.fullDataBlockSize = 1000;
return data;
}
void
test_buffer_entry_late_entry( )
{
const int gps_time = 1000000000;
std::atomic< int64_t > late_entries{ 0 };
std::atomic< int64_t > discarded_entries{ 0 };
std::atomic< int64_t > total_span{ 0 };
auto buffer = std::make_shared< buffer_entry >(
&late_entries, &discarded_entries, &total_span );
auto data = dummy_data( 5, gps_time, 0 );
buffer->ingest( *data );
if ( late_entries != 0 || discarded_entries != 0 )
{
throw std::runtime_error( "unexpected late or discarded entry" );
}
data = dummy_data( 6, gps_time, 0 );
buffer->ingest( *data );
if ( late_entries != 0 || discarded_entries != 0 )
{
throw std::runtime_error( "unexpected late or discarded entry" );
}
bool cb_called{ false };
auto cb = [&cb_called]( const daq_dc_data_t& dc ) { cb_called = true; };
buffer->process_if( gps_key( gps_time, 0 ), cb );
if ( !cb_called )
{
throw std::runtime_error( "Processing callback not called" );
}
data = dummy_data( 7, gps_time, 0 );
buffer->ingest( *data );
if ( late_entries != 1 )
{
throw std::runtime_error( "Expected late entry not marked as late" );
}
if ( discarded_entries != 0 )
{
throw std::runtime_error( "Unexpected discard entry" );
}
late_entries.store( 0 );
data = dummy_data( 7, gps_time - 1, 0 );
buffer->ingest( *data );
if ( discarded_entries != 1 )
{
throw std::runtime_error(
"Expected discard entry not marked as discarded" );
}
if ( late_entries != 0 )
{
throw std::runtime_error( "Unexpected entry marked late" );
}
}
int
main( int argc, char* argv[] )
{
test_buffer_entry_late_entry( );
return 0;
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment