Something went wrong on our end
Jonathan Hanks authored
The buffer enters into a late state after its data has been read out. Any data for that cycle that arrives is considered late, until the next cycle comes in, at which point it is marked as dropped. This updates the late state when data is copied out of the buffer and clears it when a new gps second/cycle is accepted in. Closes #119.
Jonathan Hanks authoredThe buffer enters into a late state after its data has been read out. Any data for that cycle that arrives is considered late, until the next cycle comes in, at which point it is marked as dropped. This updates the late state when data is copied out of the buffer and clears it when a new gps second/cycle is accepted in. Closes #119.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
recv_buffer.hh 12.58 KiB
// Created by jonathan.hanks on 12/16/19.
#include "daq_core.h"
#include <algorithm>
#include <array>
#include <functional>
#include <iterator>
#include <mutex>
#include <iostream>
#include <stdint.h>
#include <sys/time.h>
#include "atomic_config.h"
//#include "struct_compact.hh"
#include <atomic>
template < typename T >
load_atomically( const T& item )
return reinterpret_cast< const std::atomic< T >& >( item ).load( );
template < typename T >
store_atomically( T& item, const T value )
reinterpret_cast< std::atomic< T >& >( item ).store( value );
template < typename T >
load_atomically( const T& item )
return __sync_or_and_fetch( const_cast< T* >( &item ),
static_cast< T >( 0 ) );
template < typename T >
store_atomically( T& item, const T value )
T old_val;
old_val = load_atomically( item );
while ( old_val != value )
old_val = __sync_val_compare_and_swap( &( &item ), old_val, value );
* @brief std::begin for arrays on old gcc
template < typename T, size_t N >
T* array_begin( T ( &array )[ N ] )
return array;
* @brief std::end for arrays on old gcc
template < typename T, size_t N >
T* array_end( T ( &array )[ N ] )
return array + N;
* @brief a combination of gps seconds and cycle number put together in a way
* that can be used as an indexing key.
struct gps_key
typedef uint64_t key_type;
typedef uint32_t gps_type;
typedef uint32_t cycle_type;
key_type key;
gps_key( ) : key( 0 )
gps_key( key_type sec, cycle_type cycle )
: key( ( static_cast< key_type >( sec ) << shift_amount( ) ) |
( cycle & cycle_mask( ) ) )
gps_key( const gps_key& other ) = default;
gps_key& operator=( const gps_key& other ) = default;
gps( ) const
return static_cast< gps_type >( key >> shift_amount( ) );
cycle( ) const
return static_cast< cycle_type >( key & cycle_mask( ) );
operator==( const gps_key& other ) const
return key == other.key;
operator!=( const gps_key& other ) const
return key != other.key;
operator<( const gps_key& other ) const
return key < other.key;
operator<=( const gps_key& other ) const
return key <= other.key;
operator>( const gps_key& other ) const
return key > other.key;
operator>=( const gps_key& other ) const
return key >= other.key;
atomic_load_from( const gps_key* source )
key = load_atomically( source->key );
// void
// atomic_store_to( gps_key* dest )
// {
// gps_key old_key;
// old_key.atomic_load_from( dest );
// while ( old_key != *this )
// {
// old_key.key =
// __sync_val_compare_and_swap( &( dest->key ), old_key.key,
// key );
// }
// }
template < typename Pred >
atomic_store_to_if( gps_key* dest, Pred& pred = Pred( ) )
gps_key old_key;
old_key.atomic_load_from( dest );
while ( pred( old_key, *this ) )
old_key.key =
__sync_val_compare_and_swap( &( dest->key ), old_key.key, key );
explicit gps_key( key_type new_key ) : key( new_key )
static key_type
shift_amount( )
return 4;
static cycle_type
cycle_mask( )
return static_cast< cycle_type >( 0xf );
struct buffer_headers
std::array< daq_msg_header_t, DAQ_TRANSIT_MAX_DCU > headers;
std::array< int64_t, DAQ_TRANSIT_MAX_DCU > time_ingested;
gps_key latest;
unsigned int dcu_count;
struct buffer_entry
buffer_entry( std::atomic< int64_t >* track_late_here = nullptr,
std::atomic< int64_t >* track_discards_here = nullptr,
std::atomic< int64_t >* track_total_span_here = nullptr )
: m( ), latest( ), ifo_data( ), data( &ifo_data.dataBlock[ 0 ] ),
time_ingested( ), first_injestion( 0 ), last_injestion( 0 ),
is_late_( false ), dummy_tracking_( 0 ),
late_notification_( track_late_here ? track_late_here
: &dummy_tracking_ ),
discard_notification_( track_discards_here ? track_discards_here
: &dummy_tracking_ ),
span_notification_( track_total_span_here ? track_total_span_here
: &dummy_tracking_ )
std::mutex m;
gps_key latest;
daq_dc_data_t ifo_data;
char* data;
std::array< int64_t, DAQ_TRANSIT_MAX_DCU > time_ingested;
int64_t first_injestion;
int64_t last_injestion;
bool is_late_;
std::atomic< int64_t > dummy_tracking_;
std::atomic< int64_t >* late_notification_;
std::atomic< int64_t >* discard_notification_;
std::atomic< int64_t >* span_notification_;
setup_tracking( std::atomic< int64_t >* track_late_here,
std::atomic< int64_t >* track_discards_here,
std::atomic< int64_t >* track_total_span_here )
std::lock_guard< std::mutex > l_( m );
late_notification_ =
( track_late_here ? track_late_here : &dummy_tracking_ );
discard_notification_ =
( track_discards_here ? track_discards_here : &dummy_tracking_ );
span_notification_ = ( track_total_span_here ? track_total_span_here
: &dummy_tracking_ );
static int64_t
time_now( )
timeval tv;
gettimeofday( &tv, 0 );
return static_cast< int64_t >( tv.tv_sec * 1000 + tv.tv_usec / 1000 );
ingest( const daq_multi_dcu_data_t& input )
gps_key key( input.header.dcuheader[ 0 ].timeSec,
input.header.dcuheader[ 0 ].cycle );
int64_t timestamp = time_now( );
std::lock_guard< std::mutex > l_( m );
if ( key > latest )
( *span_notification_ )
.exchange( last_injestion - first_injestion );
clear( );
first_injestion = timestamp;
else if ( key < latest )
( *discard_notification_ )++;
if ( is_late_ )
( *late_notification_ )++;
last_injestion = timestamp;
// expand_into_daq_struct((void*)&input, 64, &ifo_data);
const char* input_data = &input.dataBlock[ 0 ];
for ( int i = 0; i < input.header.dcuTotalModels; ++i )
const daq_msg_header_t& cur_header = input.header.dcuheader[ i ];
if ( cur_header.dcuId == 0 )
int dest_index = ifo_data.header.dcuTotalModels;
if ( dest_index >= DAQ_TRANSIT_MAX_DCU )
ifo_data.header.dcuheader[ dest_index ] = cur_header;
std::size_t block_size =
cur_header.dataBlockSize + cur_header.tpBlockSize;
if ( data + block_size > array_end( ifo_data.dataBlock ) )
std::copy( input_data, input_data + block_size, data );
data += block_size;
input_data += block_size;
ifo_data.header.fullDataBlockSize += block_size;
time_ingested[ dest_index ] = timestamp;
latest = key;
template < typename Callback >
process_if( const gps_key process_key, Callback& cb = Callback( ) )
std::lock_guard< std::mutex > l_( m );
if ( process_key != latest )
cb( ifo_data );
// anything received after this point is late.
is_late_ = true;
copy_headers( buffer_headers& dest )
std::lock_guard< std::mutex > l_( m );
std::copy( array_begin( ifo_data.header.dcuheader ),
array_end( ifo_data.header.dcuheader ),
dest.headers.begin( ) );
std::copy( time_ingested.begin( ),
time_ingested.end( ),
dest.time_ingested.begin( ) );
dest.latest = latest;
dest.dcu_count = ifo_data.header.dcuTotalModels;
clear( )
ifo_data.header.dcuTotalModels = 0;
ifo_data.header.fullDataBlockSize = 0;
data = &ifo_data.dataBlock[ 0 ];
is_late_ = false;
template < int N >
struct receive_buffer
#ifdef __cpp_static_assert
static_assert( N > 0, "The receive buffer must have at least 1 segment" );
static_assert( N <= 16, "A second is the most the buffer will hold" );
typedef std::mutex mutex_type;
receive_buffer( )
: latest_{}, late_entries_{ 0 }, discarded_entries_{ 0 },
cycle_message_span_( ), buffer_{}
for ( auto& buffer : buffer_ )
&late_entries_, &discarded_entries_, &cycle_message_span_ );
receive_buffer( const receive_buffer& other ) = delete;
receive_buffer( receive_buffer&& other ) = delete;
receive_buffer& operator=( const receive_buffer& other ) = delete;
receive_buffer& operator=( receive_buffer&& other ) = delete;
ingest( const daq_multi_dcu_data_t& input )
if ( input.header.dcuTotalModels <= 0 )
int index = cycle_to_index( input.header.dcuheader[ 0 ].cycle );
buffer_entry& target_buffer = buffer_[ index ];
gps_key key( input.header.dcuheader[ 0 ].timeSec,
input.header.dcuheader[ 0 ].cycle );
target_buffer.ingest( input );
std::less< gps_key > comp;
key.atomic_store_to_if( &latest_, comp );
latest( ) const
gps_key key;
key.atomic_load_from( &latest_ );
return key;
get_and_clear_late( )
return 0 );
get_and_clear_discards( )
return 0 );
get_and_clear_cycle_message_span( )
return 0 );
* @brief Act on/process a slice of the buffer, as indexed by the given
* gps_key
* @tparam Callback The handler to process data with
* @param process_key The key to index
* @param cb The instance of the handler, a callable which is invoked with
* the daq data as its argument
* @note This will acquire any locks or do any synchronization to ensure
* that cb is called with exclusive access to the data block.
* @note Note that if process_key is not in the buffer, then cb will NOT be
* called.
template < typename Callback >
process_slice_at( const gps_key process_key, Callback& cb = Callback( ) )
int index = cycle_to_index( process_key.cycle( ) );
buffer_entry& target_buffer = buffer_[ index ];
target_buffer.process_if( process_key, cb );
template < typename It >
copy_headers( It dest )
for ( size_t i = 0; i < N; ++i )
buffer_[ i ].copy_headers( *dest );
size( ) const
return N;
static int
cycle_to_index( gps_key::cycle_type cycle )
return cycle % N;
gps_key latest_;
std::atomic< std::int64_t > late_entries_;
std::atomic< std::int64_t > discarded_entries_;
std::atomic< std::int64_t > cycle_message_span_;
std::array< buffer_entry, N > buffer_;