Something went wrong on our end
-
Jonathan Hanks authored
* Spelling fix, changing to stop_function_t from stop_funtion_t. * Corrected the timed circular buffer get to timeout.
Jonathan Hanks authored* Spelling fix, changing to stop_function_t from stop_funtion_t. * Corrected the timed circular buffer get to timeout.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
circ.cc 22.95 KiB
#include <config.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#include <iostream>
using namespace std;
#include <stdlib.h>
#include <string.h>
#include <limits.h>
#include <errno.h>
#include <stdio.h>
#include "circ.hh"
#include "raii.hh"
#ifndef SCOPE_PROGRAM
#include "daqd.hh"
extern daqd_c daqd;
#endif
int
circ_buffer::buffer_malloc( int consumers,
int blocks,
long block_size,
time_t block_period )
{
long bufs;
// adjust the block size to be the multiple of 8
// this must be done to avoid sigbus errors when accessing double data
if ( block_size % 8 )
block_size += 8 - block_size % 8;
pbuffer = (circ_buffer_t*)malloc( bufs = blocks * block_size +
sizeof( circ_buffer_t ) - 1 );
if ( !pbuffer )
return -1;
memset( pbuffer, 0, bufs );
pthread_mutex_init( &pbuffer->lock, NULL );
pbuffer->blocks = blocks;
pbuffer->block_size = block_size;
pbuffer->block_period = block_period;
for ( int i = 0; i < blocks; i++ )
{
// pbuffer -> block [i].datai = i * block_size;
pthread_mutex_init( &pbuffer->block[ i ].lock, NULL );
pbuffer->block[ i ].busy.clear_all( );
for ( int j = 0; j < 16; j++ )
pbuffer->block[ i ].busy16th[ j ].clear_all( );
pthread_cond_init( &pbuffer->block[ i ].notfull, NULL );
pthread_cond_init( &pbuffer->block[ i ].notempty, NULL );
pthread_cond_init( &pbuffer->block[ i ].notempty16th, NULL );
}
pbuffer->producers = 1;
pbuffer->consumers = consumers;
pbuffer->tcmask.clear_all( );
pbuffer->cmask.clear_all( );
if ( consumers )
{
for ( int j = 0; j < consumers; ++j )
{
pbuffer->tcmask.set( j );
}
pbuffer->cmask = pbuffer->tcmask;
}
pbuffer->cmask16th.clear_all( );
DEBUG1( cerr << "circ buffer constructed; blocks=" << blocks
<< " block_size=" << block_size << "\n" );
return 0;
}
// circ_buffer::circ_buffer (int consumers = 1, int blocks = 100, long
// block_size = 10240, time_t block_period = 1, mem_choice
// mem_flagp=flag_malloc, char *param1 = NULL)
circ_buffer::circ_buffer( int consumers,
int blocks,
long block_size,
time_t block_period,
mem_choice mem_flagp,
char* param1 )
{
if ( blocks > MAX_BLOCKS )
{
// too many blocks
assert( 0 );
}
this->mem_flag = mem_flagp;
switch ( mem_flagp )
{
case flag_malloc:
buffer_malloc( consumers, blocks, block_size, block_period );
break;
case ptr:
assert( param1 );
pbuffer = (circ_buffer_t*)param1;
break;
default:
// not implemented
assert( 0 );
}
}
circ_buffer::~circ_buffer( )
{
if ( pbuffer )
{
assert( invariant( 1 ) );
pthread_mutex_destroy( &pbuffer->lock );
for ( int i = 0; i < pbuffer->blocks; i++ )
{
pthread_mutex_destroy( &pbuffer->block[ i ].lock );
pthread_cond_destroy( &pbuffer->block[ i ].notfull );
pthread_cond_destroy( &pbuffer->block[ i ].notempty );
pthread_cond_destroy( &pbuffer->block[ i ].notempty16th );
}
switch ( mem_flag )
{
case flag_malloc:
assert( pbuffer );
free( (void*)pbuffer );
DEBUG1( cerr << "circ buffer deleted\n" );
break;
default:
break;
}
}
}
// FIXME: do not broadcast on `notempty16th' if there are no fast consumers
/*
Zero out all the data in the current 16th part of a current block, indicating
lost data block.
FIXME: has to set `lost data' status on a block. May also indicate with
bit-mask which parts of a block are lost (16 bits for 16 parts).
*/
int
circ_buffer::put16th_dpscattered_lost( struct put_dpvec* pv,
int pvlen,
circ_buffer_block_prop_t* prop )
{
int nbi, nbi16th;
pthread_mutex_lock( &pbuffer->block[ pbuffer->next_block_in ].lock );
{
nbi = pbuffer->next_block_in;
nbi16th = pbuffer->next_block_in_16th;
#ifndef NDEBUG
if ( !nbi16th )
assert( invariant( ) );
#endif
while ( !pbuffer->block[ nbi ].busy.empty( ) )
pthread_cond_wait( &pbuffer->block[ nbi ].notfull,
&pbuffer->block[ nbi ].lock );
char* dst = pbuffer->data_space + nbi * pbuffer->block_size;
unsigned long dlen = 0;
for ( int i = 0; i < pvlen; i++ )
{
memset( dst + pv[ i ].dest_vec_idx + pv[ i ].vec_len * nbi16th,
0,
pv[ i ].vec_len );
dlen += pv[ i ].vec_len;
}
if ( prop )
pbuffer->block[ nbi ].prop16th[ nbi16th ] = *prop;
pbuffer->block[ nbi ].prop16th[ nbi16th ].seq_num = pbuffer->puts;
if ( !nbi16th )
{
pbuffer->block[ nbi ].bytes = dlen;
if ( prop )
{
pbuffer->block[ nbi ].prop = *prop;
}
pbuffer->block[ nbi ].prop.seq_num = pbuffer->puts;
}
else
pbuffer->block[ nbi ].bytes += dlen;
}
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
{
if ( nbi16th == 15 )
{
pbuffer->block[ nbi ].busy = pbuffer->cmask;
++pbuffer->next_block_in %= pbuffer->blocks;
pbuffer->puts++;
}
++pbuffer->next_block_in_16th %= 16;
pbuffer->block[ nbi ].busy16th[ nbi16th ] = pbuffer->cmask16th;
}
pthread_mutex_unlock( &pbuffer->lock );
if ( nbi16th == 15 )
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty );
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty16th );
return nbi;
}
/*
Place new 16th part of a block in the queue, scattered data interface on the
source and on the destination
*/
int
circ_buffer::put16th_dpscattered( struct put_dpvec* pv,
int pvlen,
circ_buffer_block_prop_t* prop )
{
int nbi, nbi16th;
pthread_mutex_lock( &pbuffer->block[ pbuffer->next_block_in ].lock );
{
nbi = pbuffer->next_block_in;
nbi16th = pbuffer->next_block_in_16th;
#ifndef NDEBUG
if ( !nbi16th )
assert( invariant( ) );
#endif
while ( !pbuffer->block[ nbi ].busy.empty( ) )
pthread_cond_wait( &pbuffer->block[ nbi ].notfull,
&pbuffer->block[ nbi ].lock );
char* dst = pbuffer->data_space + nbi * pbuffer->block_size;
unsigned long dlen = 0;
for ( int i = 0; i < pvlen; i++ )
{
assert( nbi16th < 16 && nbi16th >= 0 );
assert( pv[ i ].dest_vec_idx + pv[ i ].vec_len * nbi16th +
pv[ i ].vec_len <=
pbuffer->block_size );
int status = *( pv[ i ].src_status_addr );
#ifndef SCOPE_PROGRAM
if ( status && daqd.zero_bad_data )
{
#else
if ( status )
{
#endif
memset( dst + pv[ i ].dest_vec_idx + pv[ i ].vec_len * nbi16th,
0,
pv[ i ].vec_len );
}
else
{
void* dest =
dst + pv[ i ].dest_vec_idx + pv[ i ].vec_len * nbi16th;
void* src = (void*)pv[ i ].src_pvec_addr;
#ifdef DATA_CONCENTRATOR
int bsw = pv[ i ].bsw;
switch ( bsw )
{
case 2:
{
int n = pv[ i ].vec_len / bsw;
for ( int j = 0; j < n; j++ )
( (short*)dest )[ j ] = ntohs( ( (short*)src )[ j ] );
}
break;
case 4:
{
int n = pv[ i ].vec_len / bsw;
for ( int j = 0; j < n; j++ )
( (int*)dest )[ j ] = ntohl( ( (int*)src )[ j ] );
}
break;
default:
memcpy( dest, src, pv[ i ].vec_len );
break;
}
#else
memcpy( dest, src, pv[ i ].vec_len );
#endif
}
#define memor4( dest, tgt ) \
*( (unsigned char*)dest ) |= *( (unsigned char*)tgt ); \
*( ( (unsigned char*)dest ) + 1 ) |= *( ( (unsigned char*)tgt ) + 1 ); \
*( ( (unsigned char*)dest ) + 2 ) |= *( ( (unsigned char*)tgt ) + 2 ); \
*( ( (unsigned char*)dest ) + 3 ) |= *( ( (unsigned char*)tgt ) + 3 );
if ( pv[ i ].dest_status_idx != 0xffffffff )
{
if ( !nbi16th )
{
// assign status word for the first 16th of a second
//*((int *)(dst + pv [i].dest_status_idx)) = status;
memcpy( (char*)( dst + pv[ i ].dest_status_idx ),
&status,
sizeof( int ) );
}
else
{
// OR status word
// *((int *)(dst + pv [i].dest_status_idx)) |= status;
memor4( dst + pv[ i ].dest_status_idx, &status );
}
// *((int *)(dst + pv [i].dest_status_idx) + 1 + nbi16th) =
// status;
memcpy( (char*)( dst + pv[ i ].dest_status_idx +
sizeof( int ) * ( 1 + nbi16th ) ),
&status,
sizeof( int ) );
}
dlen += pv[ i ].vec_len;
}
if ( prop )
{
pbuffer->block[ nbi ].prop16th[ nbi16th ] = *prop;
}
pbuffer->block[ nbi ].prop16th[ nbi16th ].seq_num = pbuffer->puts;
if ( !nbi16th )
{
pbuffer->block[ nbi ].bytes = dlen;
if ( prop )
{
pbuffer->block[ nbi ].prop = *prop;
}
pbuffer->block[ nbi ].prop.seq_num = pbuffer->puts;
}
else
pbuffer->block[ nbi ].bytes += dlen;
}
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
{
if ( nbi16th == 15 )
{
pbuffer->block[ nbi ].busy = pbuffer->cmask;
++pbuffer->next_block_in %= pbuffer->blocks;
pbuffer->puts++;
}
++pbuffer->next_block_in_16th %= 16;
pbuffer->block[ nbi ].busy16th[ nbi16th ] = pbuffer->cmask16th;
}
pthread_mutex_unlock( &pbuffer->lock );
if ( nbi16th == 15 )
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty );
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty16th );
return nbi;
}
/*
Place new block in the queue
*/
int
circ_buffer::put( char* data, int dlen, circ_buffer_block_prop_t* prop )
{
int nbi;
/* FIXME: For multiple producers `next_block_in' access should be
* synchronized */
pthread_mutex_lock( &pbuffer->block[ pbuffer->next_block_in ].lock );
{
assert( dlen <= pbuffer->block_size && dlen >= 0 );
assert( invariant( ) );
nbi = pbuffer->next_block_in;
while ( !pbuffer->block[ nbi ].busy.empty( ) )
pthread_cond_wait( &pbuffer->block[ nbi ].notfull,
&pbuffer->block[ nbi ].lock );
memcpy( pbuffer->data_space + nbi * pbuffer->block_size, data, dlen );
pbuffer->block[ nbi ].bytes = dlen;
if ( prop )
{
pbuffer->block[ nbi ].prop = *prop;
}
pbuffer->block[ nbi ].prop.seq_num = pbuffer->puts;
}
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
{
pbuffer->block[ nbi ].busy = pbuffer->cmask;
++pbuffer->next_block_in %= pbuffer->blocks;
pbuffer->puts++;
}
pthread_mutex_unlock( &pbuffer->lock );
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty );
return nbi;
}
/*
Place new block in the queue, scattered data interface on the source
*/
int
circ_buffer::put_pscattered( struct put_pvec* pv,
int pvlen,
circ_buffer_block_prop_t* prop )
{
int nbi;
/* For multiple producers `next_block_in' access should be synchronized */
pthread_mutex_lock( &pbuffer->block[ pbuffer->next_block_in ].lock );
{
assert( invariant( ) );
nbi = pbuffer->next_block_in;
while ( !pbuffer->block[ nbi ].busy.empty( ) )
pthread_cond_wait( &pbuffer->block[ nbi ].notfull,
&pbuffer->block[ nbi ].lock );
char* dst = pbuffer->data_space + nbi * pbuffer->block_size;
unsigned long dlen = 0;
for ( int i = 0; i < pvlen; i++ )
{
memcpy( dst + dlen, pv[ i ].pvec_addr, pv[ i ].pvec_len );
dlen += pv[ i ].pvec_len;
}
pbuffer->block[ nbi ].bytes = dlen;
if ( prop )
{
pbuffer->block[ nbi ].prop = *prop;
}
pbuffer->block[ nbi ].prop.seq_num = pbuffer->puts;
}
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
{
pbuffer->block[ nbi ].busy = pbuffer->cmask;
++pbuffer->next_block_in %= pbuffer->blocks;
pbuffer->puts++;
}
pthread_mutex_unlock( &pbuffer->lock );
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty );
return nbi;
}
/*
Place new block in the queue
Drop the block if queue is busy
Returns -1 if the block was dropped.
*/
int
circ_buffer::put_nowait( char* data, int dlen, circ_buffer_block_prop_t* prop )
{
int ret;
int nbi;
/* For multiple producers `next_block_in' access should be synchronized */
pthread_mutex_lock( &pbuffer->block[ pbuffer->next_block_in ].lock );
assert( dlen <= pbuffer->block_size && dlen >= 0 );
assert( invariant( ) );
if ( pbuffer->block[ ret = nbi = pbuffer->next_block_in ].busy.empty( ) )
{
memcpy( pbuffer->data_space + nbi * pbuffer->block_size, data, dlen );
pbuffer->block[ nbi ].bytes = dlen;
if ( prop )
{
pbuffer->block[ nbi ].prop = *prop;
}
pbuffer->block[ nbi ].prop.seq_num = pbuffer->puts;
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
pbuffer->block[ nbi ].busy = pbuffer->cmask;
++pbuffer->next_block_in %= pbuffer->blocks;
}
else
{
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
pbuffer->drops++;
ret = -1;
}
pbuffer->puts++;
pthread_mutex_unlock( &pbuffer->lock );
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty );
return ret;
}
/*
Identical to `put_nowait', but takes scattered data.
It will read
`data + pv [0].vec_idx' for `pv [0].vec_len' bytes
.
.
.
`data + pv [pvlen - 1].vec_idx' for `pv [pvlen - 1].vec_len' bytes
*/
int
circ_buffer::put_nowait_scattered( char* data,
struct put_vec* pv,
int pvlen,
circ_buffer_block_prop_t* prop )
{
int ret;
int nbi;
/* For multiple producers `next_block_in' access should be synchronized */
pthread_mutex_lock( &pbuffer->block[ pbuffer->next_block_in ].lock );
assert( invariant( ) );
if ( pbuffer->block[ ret = nbi = pbuffer->next_block_in ].busy.empty( ) )
{
char* dst = pbuffer->data_space + nbi * pbuffer->block_size;
unsigned long dlen = 0;
for ( int i = 0; i < pvlen; i++ )
{
memcpy( dst + dlen, data + pv[ i ].vec_idx, pv[ i ].vec_len );
dlen += pv[ i ].vec_len;
}
pbuffer->block[ nbi ].bytes = dlen;
if ( prop )
{
pbuffer->block[ nbi ].prop = *prop;
}
pbuffer->block[ nbi ].prop.seq_num = pbuffer->puts;
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
pbuffer->block[ nbi ].busy = pbuffer->cmask;
++pbuffer->next_block_in %= pbuffer->blocks;
}
else
{
pthread_mutex_unlock( &pbuffer->block[ nbi ].lock );
pthread_mutex_lock( &pbuffer->lock );
pbuffer->drops++;
ret = -1;
}
pbuffer->puts++;
pthread_mutex_unlock( &pbuffer->lock );
pthread_cond_broadcast( &pbuffer->block[ nbi ].notempty );
return ret;
}
int
circ_buffer::get16th( int cnum )
{
int nbo, nbo16th;
pthread_mutex_lock(
&pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock );
assert( invariant( ) );
nbo = pbuffer->next_block_out[ cnum ];
nbo16th = pbuffer->next_block_out_16th[ cnum ];
while ( !( pbuffer->block[ nbo ].busy16th[ nbo16th ].get( cnum ) ) )
pthread_cond_wait( &pbuffer->block[ nbo ].notempty16th,
&pbuffer->block[ nbo ].lock );
pbuffer->next_block_out_16th[ cnum ]++;
pthread_mutex_unlock( &pbuffer->block[ nbo ].lock );
return nbo << 4 | nbo16th;
}
/*
Get next data block index for consumer number `cnum'.
This will hang on while the block is not filled.
Must be followed as soon as possible by the call to circ_buffer::unlock(),
or producer will block.
avi Tue Jan 6 11:05:45 PST 1998
It seems that the race condition possible in the situation with one slow
consumer one fast consumer and fast producer -- fast producer will make full
circle on the buffer and catches up with the slow consumer; it will decrease
`busy' flag between slow consumer `get' and `unlock' calls. Slow consumer
then either sets `busy' negative or reads data from the buffer while producer
puts new block in. This could be managed by having `busy' flags for every
consumer.
**** fixed by using bit flags in `busy', one for each consumer ***
*/
int
circ_buffer::get( int cnum )
{
int nbo;
pthread_mutex_lock(
&pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock );
assert( invariant( ) );
nbo = pbuffer->next_block_out[ cnum ];
while ( !( pbuffer->block[ nbo ].busy.get( cnum ) ) )
pthread_cond_wait( &pbuffer->block[ nbo ].notempty,
&pbuffer->block[ nbo ].lock );
pthread_mutex_unlock( &pbuffer->block[ nbo ].lock );
return nbo;
}
/*
Get next data block index for consumer number `cnum'. With a timeout at
absolute time ts.
This will hang on while the block is not filled.
Must be followed as soon as possible by the call to circ_buffer::unlock(),
or producer will block.
avi Tue Jan 6 11:05:45 PST 1998
It seems that the race condition possible in the situation with one slow
consumer one fast consumer and fast producer -- fast producer will make full
circle on the buffer and catches up with the slow consumer; it will decrease
`busy' flag between slow consumer `get' and `unlock' calls. Slow consumer
then either sets `busy' negative or reads data from the buffer while producer
puts new block in. This could be managed by having `busy' flags for every
consumer.
**** fixed by using bit flags in `busy', one for each consumer ***
returns -1 on timeout.
*/
int
circ_buffer::timed_get( int cnum, timespec* ts )
{
int nbo;
raii::lock_guard< pthread_mutex_t > l_(
pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock );
assert( invariant( ) );
nbo = pbuffer->next_block_out[ cnum ];
while ( !( pbuffer->block[ nbo ].busy.get( cnum ) ) )
{
if ( pthread_cond_timedwait( &pbuffer->block[ nbo ].notempty,
&pbuffer->block[ nbo ].lock,
ts ) == ETIMEDOUT )
{
return -1;
}
}
return nbo;
}
/*
Nonblocking get
*/
int
circ_buffer::get_nowait( int cnum )
{
int ret;
int nbo;
pthread_mutex_lock(
&pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock );
assert( invariant( ) );
nbo = pbuffer->next_block_out[ cnum ];
if ( pbuffer->block[ nbo ].busy.get( cnum ) )
ret = nbo;
else
ret = -1;
pthread_mutex_unlock( &pbuffer->block[ nbo ].lock );
return ret;
}
/*
Free block for the consumer. Clean all 1/16 synchronization
*/
void
circ_buffer::unlock16th( int cnum )
{
pthread_mutex_lock(
&pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock );
assert( invariant( ) );
int nbo = pbuffer->next_block_out[ cnum ];
pbuffer->block[ nbo ].busy.clear( cnum );
bool busy = !pbuffer->block[ nbo ].busy.empty( );
for ( int i = 0; i < 16; i++ )
pbuffer->block[ nbo ].busy16th[ i ].clear( cnum );
pthread_mutex_unlock( &pbuffer->block[ nbo ].lock );
if ( busy )
pthread_cond_broadcast( &pbuffer->block[ nbo ].notempty );
else
pthread_cond_broadcast( &pbuffer->block[ nbo ].notfull );
++pbuffer->next_block_out[ cnum ] %= pbuffer->blocks;
pbuffer->next_block_out_16th[ cnum ] = 0;
}
/*
Free block for the consumer
*/
void
circ_buffer::unlock( int cnum )
{
pthread_mutex_lock(
&pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock );
assert( invariant( ) );
int nbo = pbuffer->next_block_out[ cnum ];
pbuffer->block[ nbo ].busy.clear( cnum );
bool busy = !pbuffer->block[ nbo ].busy.empty( );
pthread_mutex_unlock( &pbuffer->block[ nbo ].lock );
if ( busy )
pthread_cond_broadcast( &pbuffer->block[ nbo ].notempty );
else
pthread_cond_broadcast( &pbuffer->block[ nbo ].notfull );
++pbuffer->next_block_out[ cnum ] %= pbuffer->blocks;
}
/*
** This is executed by an inactive consumer thread
** to support synchronization
*/
int
circ_buffer::noop( int cnum )
{
pthread_mutex_lock(
&pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock );
assert( invariant( ) );
int nbo = pbuffer->next_block_out[ cnum ];
while ( !( pbuffer->block[ nbo ].busy.get( cnum ) ) )
pthread_cond_wait( &pbuffer->block[ nbo ].notempty,
&pbuffer->block[ nbo ].lock );
pbuffer->block[ nbo ].busy.clear( cnum );
bool busy = !pbuffer->block[ nbo ].busy.empty( );
pthread_mutex_unlock( &pbuffer->block[ nbo ].lock );
if ( busy )
pthread_cond_broadcast( &pbuffer->block[ nbo ].notempty );
else
pthread_cond_broadcast( &pbuffer->block[ nbo ].notfull );
++pbuffer->next_block_out[ cnum ] %= pbuffer->blocks;
return nbo;
}