Newer
Older
#define _XOPEN_SOURCE 600
#define _BSD_SOURCE 1
#define _DEFAULT_SOURCE 1
#include <config.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <sys/time.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h>
#include <sys/timeb.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include <algorithm>
#include <array>
#include <memory>
#include <sys/prctl.h>
#ifdef USE_SYMMETRICOM
#ifndef USE_IOP
//#include <bcuser.h>
#include <../drv/gpstime/gpstime.h>
#endif
#endif
using namespace std;
#include "circ.hh"
#include "daqd.hh"
#include "sing_list.hh"
#include "drv/cdsHardware.h"
#include <netdb.h>
#include "net_writer.hh"
#include "drv/cdsHardware.h"
#include <sys/ioctl.h>

Jonathan Hanks
committed
#include "checksum_crc32.hh"
#include "epics_pvs.hh"
#include "raii.hh"
#include "conv.hh"
#include "circ.h"

Jonathan Hanks
committed
#include "shmem_receiver.hh"
extern daqd_c daqd;
extern int shutdown_server( );
extern unsigned int crctab[ 256 ];
struct ToLower
{
char
operator( )( char c ) const
{
return std::tolower( c );
}
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*!
* @brief return a pointer to the first dcu_msg_header_t in the
* daq_multi_dcu_header_t structure
* @param headers the headers structure
* @return pointer to the first message header.
* @note added so that standard algorithms could be used with the set of
* populated dcu headers in a block
*/
inline daq_msg_header_t*
begin( daq_multi_dcu_header_t& headers )
{
return headers.dcuheader;
}
/*!
* @brief return a pointer just past the last used dcu_msg_header_t in the
* daq_multi_dcu_header_t structure
* @param headers the headers structure
* @return pointer just passed the last used message header (as denoted by the
* dcuTotalModels field)
* @note added so that standard algorithms could be used with the set of
* populated dcu headers in a block
* @note this does not check for an invalid dcuTotalModels value
*/
inline daq_msg_header_t*
end( daq_multi_dcu_header_t& headers )
{
return begin( headers ) + headers.dcuTotalModels;
}
/* GM and shared memory communication area */
/* This may still be needed for test points */
// struct rmIpcStr gmDaqIpc[DCU_COUNT];
/// DMA memory area pointers
int controller_cycle = 0;
/// Pointer to GDS TP tables
struct cdsDaqNetGdsTpNum* gdsTpNum[ 2 ][ DCU_COUNT ];
void*
producer::frame_writer( )
{

Jonathan Hanks
committed
unsigned char* read_dest;
// circ_buffer_block_prop_t prop;
unsigned long prev_gps, prev_frac;
unsigned long gps, frac;

Jonathan Hanks
committed
checksum_crc32 crc_obj;
checksum_crc32 total_crc_obj;

Jonathan Hanks
committed
std::array< bool, DCU_COUNT > dcuSeenLastCycle{};
std::fill( dcuSeenLastCycle.begin( ), dcuSeenLastCycle.end( ), false );
char errmsgbuf[ 80 ];
stats stat_full, stat_recv, stat_crc, stat_transfer;
daqd_c::set_thread_priority(
"Producer", "dqprod", PROD_THREAD_PRIORITY, PROD_CPUAFFINITY );
auto work_queue = std::make_shared< work_queue_t >( );
work_queue::aborter< work_queue_t > queue_closer_( *work_queue );

Jonathan Hanks
committed
for ( int i = 0; i < PRODUCER_WORK_QUEUE_BUF_COUNT; ++i )
{
std::unique_ptr< producer_buf > buf_( new producer_buf );
buf_->move_buf = nullptr;
buf_->vmic_pv_len = 0;

Jonathan Hanks
committed
daqd.initialize_vmpic( &( buf_->move_buf ),
&( buf_->vmic_pv_len ),
buf_->vmic_pv,
&( buf_->dcu_move_addresses ) );
raii::array_ptr< unsigned char > mbuf_( buf_->move_buf );

Jonathan Hanks
committed
work_queue->add_to_queue( PRODUCER_WORK_QUEUE_START, buf_.get( ) );

Jonathan Hanks
committed
buf_.release( );
mbuf_.release( );

Jonathan Hanks
committed
}
// start up CRC and transfer thread
{

Jonathan Hanks
committed
DEBUG( 4, cerr << "Starting producer CRC thread" << endl );

Jonathan Hanks
committed
pthread_attr_t attr;

Jonathan Hanks
committed
pthread_attr_init( &attr );
pthread_attr_setstacksize( &attr, daqd.thread_stack_size );
pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM );
raii::lock_guard< pthread_mutex_t > crc_sync( prod_crc_mutex );
int err = launch_pthread( crc_tid, attr, [this, work_queue]( ) mutable {
this->frame_writer_crc( std::move( work_queue ) );
} );

Jonathan Hanks
committed
if ( err )
{
pthread_attr_destroy( &attr );
system_log( 1,
"pthread_create() err=%d while creating producer debug "
"crc thread",
err );
exit( 1 );

Jonathan Hanks
committed
}

Jonathan Hanks
committed
pthread_attr_destroy( &attr );

Jonathan Hanks
committed
pthread_cond_wait( &prod_crc_cond, &prod_crc_mutex );
DEBUG( 5, cerr << "producer threads synced" << endl );

Jonathan Hanks
committed
}

Jonathan Hanks
committed
// unsigned char* move_buf = nullptr;
// int vmic_pv_len = 0;
// raii::array_ptr< struct put_dpvec > _vmic_pv(
// new struct put_dpvec[ MAX_CHANNELS ] );
// struct put_dpvec* vmic_pv = _vmic_pv.get( );
//
//
// // use the offsets calculated by initialize_vmpic
// // for the start of the dcu's
// dcu_move_address dcu_move_addresses;
//
// // FIXME: move_buf could leak on errors (but we would probably die
// anyways. daqd.initialize_vmpic(
// &move_buf, &vmic_pv_len, vmic_pv, &dcu_move_addresses );
// raii::array_ptr< unsigned char > _move_buf( move_buf );
static struct cdsDaqNetGdsTpNum gds_tp_table[ 2 ][ DCU_COUNT ];
for ( int ifo = 0; ifo < daqd.data_feeds; ifo++ )
{
for ( int j = DCU_ID_EDCU; j < DCU_COUNT; j++ )
{
if ( daqd.dcuSize[ ifo ][ j ] == 0 )
if ( IS_MYRINET_DCU( j ) )
{
gdsTpNum[ ifo ][ j ] = gds_tp_table[ ifo ] + j;
}
else
{
gdsTpNum[ ifo ][ j ] = 0;
for ( int j = 0; j < DCU_COUNT; j++ )
{
rcvr_stats.push_back( s );
ShMemReceiver shmem_receiver(
daqd.parameters( ).get( "shmem_input", "local_dc" ),

Jonathan Hanks
committed
daqd.parameters( ).get< size_t >( "shmem_size", 104857600 ),

Jonathan Hanks
committed
[]( ) { PV::set_pv( PV::PV_PRDCR_NOT_STALLED, 0 ); } );
sleep( 1 );
stat_full.sample( );
// No waiting here if compiled as broadcasts receiver
int cycle_delay = daqd.cycle_delay;
// Wait until a second ends, so that the next data sould
// come in on cycle 0
// use the data as the clock here
int sync_tries = 0;
while ( true )
{
daq_dc_data_t* block = shmem_receiver.receive_data( );
if ( block->header.dcuTotalModels == 0 )
gps = block->header.dcuheader[ 0 ].timeSec;
frac = block->header.dcuheader[ 0 ].timeNSec;
std::cerr << block->header.dcuheader[ 0 ].timeNSec << std::endl;
// as of 8 Nov 2017 zmq_multi_stream sends the gps nanoseconds as a
// cycle number
if ( frac == DATA_BLOCKS - 1 || frac >= 937500000 )
if ( sync_tries > max_sync_tries )
{
std::cerr << "Unable to sync up to front ends after " << sync_tries
<< " attempts" << std::endl;
exit( 1 );
PV::set_pv( PV::PV_UPTIME_SECONDS, 0 );
PV::set_pv( PV::PV_GPS, 0 );
std::array< std::uint8_t, DCU_COUNT > dcu_recv_mask{};
std::uint32_t dcu_recv_count = 0;
std::uint64_t data_in_kb = 0;
std::uint64_t data_regular_in_kb = 0;
std::uint64_t data_tp_in_kb = 0;
PV::set_pv( PV::PV_PRDCR_UNIQUE_DCU_REPORTED_PER_S, 0 );
PV::set_pv( PV::PV_PRDCR_TOTAL_DCU_REPORTED_PER_S, 0 );
PV::set_pv( PV::PV_PRDCR_TOTAL_DATA_RATE_KB_PER_S, 0 );
PV::set_pv( PV::PV_PRDCR_TP_DATA_RATE_KB_PER_S, 0 );
PV::set_pv( PV::PV_PRDCR_MODEL_DATA_RATE_KB_PER_S, 0 );
int prev_controller_cycle = -1;
int dcu_cycle = 0;
int resync = 0;
if ( daqd.dcu_status_check & 4 )

Jonathan Hanks
committed
std::array< int, DCU_COUNT > dcu_to_zmq_lookup{};
std::array< char*, DCU_COUNT > dcu_data_from_zmq{};
std::array< unsigned int, DCU_COUNT > dcu_data_crc{};
std::array< unsigned int, DCU_COUNT > dcu_data_gps{};
for ( unsigned long i = 0;; i++ )
{ // timing
tick( ); // measure statistics
// DEBUG(6, printf("Timing %d gps=%d frac=%d\n", i, gps, frac));
std::fill( dcu_to_zmq_lookup.begin( ), dcu_to_zmq_lookup.end( ), -1 );
std::fill(
dcu_data_from_zmq.begin( ), dcu_data_from_zmq.end( ), (char*)0 );
std::fill( dcu_data_crc.begin( ), dcu_data_crc.end( ), 0 );
std::fill( dcu_data_gps.begin( ), dcu_data_gps.end( ), 0 );
stat_recv.sample( );
daq_dc_data_t* data_block = shmem_receiver.receive_data( );
stat_recv.tick( );

Jonathan Hanks
committed
PV::set_pv( PV::PV_PRDCR_NOT_STALLED, 1 );
// clear the total crc
total_crc_obj.reset( );
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
if ( i % 16 == 0 )
{
unsigned int tp_count = 0;
for ( const auto& dcu_header : data_block->header )
{
tp_count += dcu_header.tpCount;
}
PV::set_pv( PV::PV_PRDCR_OPEN_TP_COUNT, tp_count );
PV::set_pv( PV::PV_PRDCR_UNIQUE_DCU_REPORTED_PER_S,
std::accumulate( dcu_recv_mask.begin( ),
dcu_recv_mask.end( ),
static_cast< std::uint32_t >( 0 ) ) );
PV::set_pv( PV::PV_PRDCR_TOTAL_DCU_REPORTED_PER_S, dcu_recv_count );
PV::set_pv( PV::PV_PRDCR_TOTAL_DATA_RATE_KB_PER_S,
data_in_kb / 1024 );
PV::set_pv( PV::PV_PRDCR_TP_DATA_RATE_KB_PER_S,
data_tp_in_kb / 1024 );
PV::set_pv( PV::PV_PRDCR_MODEL_DATA_RATE_KB_PER_S,
data_regular_in_kb / 1024 );
std::fill( dcu_recv_mask.begin( ), dcu_recv_mask.end( ), 0 );
dcu_recv_count = 0;
data_in_kb = 0;
data_tp_in_kb = 0;
data_regular_in_kb = 0;
}
data_in_kb += data_block->header.fullDataBlockSize;
dcu_recv_count += data_block->header.dcuTotalModels;
for ( const auto& dcu_header : data_block->header )
{
dcu_header.tpCount;
dcu_recv_mask[ dcu_header.dcuId ] = 1;
data_tp_in_kb += dcu_header.tpBlockSize;
data_regular_in_kb += dcu_header.dataBlockSize;
}

Jonathan Hanks
committed
producer_buf* cur_buffer =
work_queue->get_from_queue( RECV_THREAD_INPUT );

Jonathan Hanks
committed
circ_buffer_block_prop_t* cur_prop = &cur_buffer->prop;
if ( data_block->header.dcuTotalModels > 0 )
{
gps = data_block->header.dcuheader[ 0 ].timeSec;
frac = data_block->header.dcuheader[ 0 ].timeNSec;
for ( int i = 0; i < data_block->header.dcuTotalModels; ++i )
if ( data_block->header.dcuheader[ i ].dataBlockSize == 0 )
std::cerr << "block " << i << " has 0 bytes"
<< std::endl;

Jonathan Hanks
committed
if ( i % 16 == 0 )

Jonathan Hanks
committed
for ( int j = 0; j < data_block->header.dcuTotalModels; ++j )

Jonathan Hanks
committed
int dcuid = data_block->header.dcuheader[ j ].dcuId;
gds_tp_table[ 0 ][ dcuid ].count =

Jonathan Hanks
committed
data_block->header.dcuheader[ j ].tpCount;
std::copy( &data_block->header.dcuheader[ j ].tpNum[ 0 ],
&data_block->header.dcuheader[ j ].tpNum[ 256 ],
&gds_tp_table[ 0 ][ dcuid ].tpNum[ 0 ] );

Jonathan Hanks
committed
}
{
auto expected_gps = prev_gps;
if ( prev_frac == DATA_BLOCKS - 1 || prev_frac >= 937500000 )

Jonathan Hanks
committed
++expected_gps;
auto expected_nano = prev_frac + ( 1000000000 / 16 );
if ( expected_nano >= 1000000000 )

Jonathan Hanks
committed
expected_nano = 0;
}
auto expected_cycle = DATA_BLOCKS + 10;
if ( prev_frac < DATA_BLOCKS )

Jonathan Hanks
committed
{
expected_cycle = ( prev_frac + 1 ) % DATA_BLOCKS;

Jonathan Hanks
committed
}
if ( data_block->header.dcuTotalModels == 0 ||
gps != expected_gps ||
( frac != expected_cycle && frac != expected_nano ) )

Jonathan Hanks
committed
{
fprintf(
stderr,
"Dropped data from shmem or received 0 dcus; gps now = "
"%d, %d; was = %d, %d; dcu count = %d\n",
gps,
frac,
(int)prev_gps,
(int)prev_frac,
(int)( data_block->header.dcuTotalModels ) );
fprintf( stderr, "\texpected gps = %d\n", expected_gps );
fprintf( stderr,
"\texpected cycle = %d\n\texpected nano = %d\n\n",
expected_cycle,
expected_cycle );

Jonathan Hanks
committed
exit( 1 );
// map out the order of the dcuids in the zmq data, this could change
// with each data block
{
int total_zmq_models = data_block->header.dcuTotalModels;
char* cur_dcu_zmq_ptr = data_block->dataBlock;
for ( int cur_block = 0; cur_block < total_zmq_models; ++cur_block )
{
unsigned int cur_dcuid =
data_block->header.dcuheader[ cur_block ].dcuId;
dcu_to_zmq_lookup[ cur_dcuid ] = cur_block;
dcu_data_from_zmq[ cur_dcuid ] = cur_dcu_zmq_ptr;
dcu_data_crc[ cur_dcuid ] =
data_block->header.dcuheader[ cur_block ].dataCrc;
dcu_data_gps[ cur_dcuid ] =
data_block->header.dcuheader[ cur_block ].timeSec;
cur_dcu_zmq_ptr +=
data_block->header.dcuheader[ cur_block ].dataBlockSize +
data_block->header.dcuheader[ cur_block ].tpBlockSize;

Jonathan Hanks
committed
stat_crc.sample( );

Jonathan Hanks
committed
read_dest = cur_buffer->move_buf;
for ( int j = DCU_ID_EDCU; j < DCU_COUNT; j++ )
{
// printf("DCU %d is %d bytes long\n", j, daqd.dcuSize[0][j]);
if ( daqd.dcuSize[ 0 ][ j ] == 0 || dcu_to_zmq_lookup[ j ] < 0 ||
dcu_data_from_zmq[ j ] == (void*)0 )
bool seenLastCycle = dcuSeenLastCycle[ j ];
if ( seenLastCycle )
daqd.dcuCrcErrCntPerSecond[ 0 ][ j ]++;
daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ]++;
daqd.dcuCrcErrCnt[ 0 ][ j ]++;
dcuSeenLastCycle[ j ] = false;
daqd.dcuStatus[ 0 ][ j ] = 0xbad;
dcuSeenLastCycle[ j ] = true;

Jonathan Hanks
committed
read_dest = cur_buffer->dcu_move_addresses.start[ j ];
long read_size = daqd.dcuDAQsize[ 0 ][ j ];
if ( IS_MYRINET_DCU( j ) )
{
dcu_cycle = i % DAQ_NUM_DATA_BLOCKS;
// dcu_cycle = gmDaqIpc[j].cycle;
// printf("cycl=%d ctrl=%d dcu=%d\n", gmDaqIpc[j].cycle,
// controller_cycle, j);
// Get the data from myrinet
// Get the data from the buffers returned by the zmq receiver
int zmq_index = dcu_to_zmq_lookup[ j ];
daq_msg_header_t& cur_dcu =
data_block->header.dcuheader[ zmq_index ];
if ( read_size != cur_dcu.dataBlockSize )
{
std::cerr << "read_size = " << read_size << " cur dcu size "
<< cur_dcu.dataBlockSize << std::endl;
// assert(read_size == cur_dcu.dataBlockSize);
memcpy( (void*)read_dest,
dcu_data_from_zmq[ j ],
cur_dcu.dataBlockSize );
int max_tp_size =
2 * DAQ_DCU_BLOCK_SIZE - cur_dcu.dataBlockSize;
int tp_size =
( cur_dcu.tpBlockSize <= max_tp_size ? cur_dcu.tpBlockSize
: max_tp_size );
memcpy( (void*)( read_dest + cur_dcu.dataBlockSize ),
(void*)( ( (char*)dcu_data_from_zmq[ j ] ) +
cur_dcu.dataBlockSize ),
tp_size );

Jonathan Hanks
committed
int tp_off = reinterpret_cast< char* >( cur_buffer->move_buf ) -
reinterpret_cast< char* >( read_dest ) +
cur_dcu.dataBlockSize;
int cblk1 = ( i + 1 ) % DAQ_NUM_DATA_BLOCKS;
static const int ifo = 0; // For now
// Calculate DCU status, if needed
if ( daqd.dcu_status_check & ( 1 << ifo ) )
{
if ( cblk1 % 16 == 0 )
{
int lastStatus = dcuStatus[ ifo ][ j ];
if ( dcuStatCycle[ ifo ][ j ] == 0 )
dcuStatus[ ifo ][ j ] = DAQ_STATE_SYNC_ERR;
dcuStatus[ ifo ][ j ] = DAQ_STATE_RUN;
// dcuCycleStatus shows how many matches of cycle number
// we got
DEBUG( 4,
cerr << "dcuid=" << j << " dcuCycleStatus="
<< dcuCycleStatus[ ifo ][ j ]
<< " dcuStatCycle="
<< dcuStatCycle[ ifo ][ j ] << endl );
if ( ( dcuCycleStatus[ ifo ][ j ] > 3 || j < 5 ) &&
dcuStatCycle[ ifo ][ j ] > 4 )
{
dcuStatus[ ifo ][ j ] = DAQ_STATE_RUN;
if ( /* (lastStatus == DAQ_STATE_RUN) && */ (
dcuStatus[ ifo ][ j ] != DAQ_STATE_RUN ) )
{
DEBUG( 4,
cerr << "Lost " << daqd.dcuName[ j ]
<< "(ifo " << ifo << "; dcu " << j
<< "); status "
<< dcuCycleStatus[ ifo ][ j ]
<< dcuStatCycle[ ifo ][ j ] << endl );
std::cout << "Lost " << daqd.dcuName[ j ] << "(ifo "
<< ifo << "; dcu " << j << "); status "
<< dcuCycleStatus[ ifo ][ j ]
<< dcuStatCycle[ ifo ][ j ] << endl;
cur_dcu.status = DAQ_STATE_FAULT;
}
if ((dcuStatus[ifo][j] ==
DAQ_STATE_RUN) /* && (lastStatus != DAQ_STATE_RUN) */)
{
DEBUG( 4,
cerr << "New " << daqd.dcuName[ j ]
<< " (dcu " << j << ")" << endl );
dcuCycleStatus[ ifo ][ j ] = 0;
dcuStatCycle[ ifo ][ j ] = 0;
cur_dcu.status = cur_dcu.status;
}
{
int intCycle = cur_dcu.cycle % DAQ_NUM_DATA_BLOCKS;
if ( intCycle != dcuLastCycle[ ifo ][ j ] )
dcuStatCycle[ ifo ][ j ]++;
dcuLastCycle[ ifo ][ j ] = intCycle;
}
}
// Update DCU status
int newStatus = cur_dcu.status != DAQ_STATE_RUN ? 0xbad : 0;
DEBUG( 4,
std::cout << "newStatus = " << ( hex ) << newStatus
<< " cur_dcu.status = " << ( dec )
<< cur_dcu.status;
std::cout << " gps = " << cur_dcu.timeSec << " gps_n = "
<< cur_dcu.timeNSec << std::endl; );
int newCrc = cur_dcu.fileCrc;
// printf("%x\n", *((int *)read_dest));
if ( !IS_EXC_DCU( j ) )
{
if ( newCrc != daqd.dcuConfigCRC[ 0 ][ j ] )
{
DEBUG( 4,
std::cout << "config crc mismatch expecting "
<< std::hex
<< daqd.dcuConfigCRC[ 0 ][ j ]
<< " got " << std::hex << newCrc
<< std::endl; );
if ( newStatus != daqd.dcuStatus[ 0 ][ j ] )
{
// system_log(1, "DCU %d IFO %d (%s) %s", j, 0,
// daqd.dcuName[j], newStatus? "fault": "running");
if ( newStatus & 0x2000 )
{
// system_log(1, "DCU %d IFO %d (%s) reconfigured (crc
// 0x%x rfm 0x%x)", j, 0, daqd.dcuName[j],
// daqd.dcuConfigCRC[0][j], newCrc);
}
}
daqd.dcuStatus[ 0 ][ j ] = newStatus;
daqd.dcuCycle[ 0 ][ j ] = cur_dcu.cycle;
unsigned long bytes = read_size;
unsigned char* cp = (unsigned char*)read_dest;

Jonathan Hanks
committed

Jonathan Hanks
committed
crc_obj.add( cp, bytes );

Jonathan Hanks
committed
auto crc = crc_obj.result( );
total_crc_obj.add( &crc, sizeof( crc ) );

Jonathan Hanks
committed
crc_obj.reset( );

Jonathan Hanks
committed
int cblk = i % 16;
// Reset CRC/second variable for this DCU
if ( cblk == 0 )
{
daqd.dcuCrcErrCntPerSecond[ 0 ][ j ] =
daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ];
daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ] = 0;
read_dest += 2 * DAQ_DCU_BLOCK_SIZE; // cur_dcu.dataBlockSize;
if ( j >= DCU_ID_ADCU_1 && ( !IS_TP_DCU( j ) ) &&
daqd.dcuStatus[ 0 ][ j ] == 0 )
{
unsigned int rfm_crc =
dcu_data_crc[ j ]; // gmDaqIpc[j].bp[cblk].crc;
unsigned int dcu_gps =
dcu_data_gps[ j ]; // gmDaqIpc[j].bp[cblk].timeSec;
// system_log(5, "dcu %d block %d cycle %d gps %d symm
// %d\n", j, cblk, gmDaqIpc[j].bp[cblk].cycle, dcu_gps,
// gps);
unsigned long mygps = gps;
// if (cblk > (15 - cycle_delay))
if ( dcu_gps != mygps )
{
daqd.dcuStatus[ 0 ][ j ] |= 0x4000;
system_log(
5,
"GPS MISS dcu %d (%s); dcu_gps=%d gps=%ld\n",
j,
daqd.dcuName[ j ],
dcu_gps,
mygps );
if ( rfm_crc != crc )
{
system_log(
5,
"MISS dcu %d (%s); crc[%d]=%x; computed crc=%lx\n",
j,
daqd.dcuName[ j ],
cblk,
rfm_crc,
crc );
/* Set DCU status to BAD, all data will be marked as BAD
because of the CRC mismatch */
daqd.dcuStatus[ 0 ][ j ] |= 0x1000;
else
{
system_log( 10,
" MATCH dcu %d (%s); crc[%d]=%x; "
"computed crc=%lx\n",
j,
daqd.dcuName[ j ],
cblk,
rfm_crc,
crc );
}
if ( daqd.dcuStatus[ 0 ][ j ] )
{
daqd.dcuCrcErrCnt[ 0 ][ j ]++;
daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ]++;

Jonathan Hanks
committed
stat_crc.tick( );
int cblk = i % 16;
// Assign per-DCU data we need to broadcast out
//
for ( int ifo = 0; ifo < daqd.data_feeds; ifo++ )
{
for ( int j = DCU_ID_EDCU; j < DCU_COUNT; j++ )
{
if ( IS_TP_DCU( j ) )
if ( daqd.dcuSize[ ifo ][ j ] == 0 )

Jonathan Hanks
committed
cur_prop->dcu_data[ j + ifo * DCU_COUNT ].cycle =
daqd.dcuCycle[ ifo ][ j ];
volatile struct rmIpcStr* ipc = daqd.dcuIpc[ ifo ][ j ];
if ( IS_MYRINET_DCU( j ) && ifo == 0 )
{

Jonathan Hanks
committed
cur_prop->dcu_data[ j ].crc =
dcu_data_crc[ j ]; // gmDaqIpc[j].bp[cblk].crc;
// printf("dcu %d crc=0x%x\n", j, prop.dcu_data[j].crc);
// Remove 0x8000 status from propagating to the broadcast
// receivers

Jonathan Hanks
committed
cur_prop->dcu_data[ j ].status =
daqd.dcuStatus[ 0 /* IFO */ ][ j ] & ~0x8000;
}
else
{

Jonathan Hanks
committed
cur_prop->dcu_data[ j + ifo * DCU_COUNT ].crc =
ipc->bp[ cblk ].crc;

Jonathan Hanks
committed
cur_prop->dcu_data[ j + ifo * DCU_COUNT ].status =
daqd.dcuStatus[ ifo ][ j ];
}
}
}
// prop.gps = time(0) - 315964819 + 33;

Jonathan Hanks
committed
cur_prop->gps = gps;
// if (cblk > (15 - cycle_delay))

Jonathan Hanks
committed
cur_prop->gps_n = 1000000000 / 16 * ( i % 16 );
// printf("before put %d %d %d\n", prop.gps, prop.gps_n, frac);
// std::cout << "about to call put16th_dpscattered with " << vmic_pv_len
// << " entries. prop.gps = " << prop.gps << " prop.gps_n = " <<
// prop.gps_n << "\n"; for (int ii = 0; ii < vmic_pv_len; ++ii)
// std::cout << std::endl;
work_queue->add_to_queue( RECV_THREAD_OUTPUT, cur_buffer );

Jonathan Hanks
committed
cur_buffer = nullptr;
PV::set_pv( PV::PV_CYCLE, i );

Jonathan Hanks
committed
PV::set_pv( PV::PV_GPS, gps );
// DEBUG1(cerr << "gps=" << PV::pv(PV::PV_GPS) << endl);
if ( i % 16 == 0 )
{
PV::pv( PV::PV_UPTIME_SECONDS )++;
{
extern unsigned long dmt_retransmit_count;
extern unsigned long dmt_failed_retransmit_count;
// Display DMT retransmit channels every second
PV::set_pv( PV::PV_BCAST_RETR, dmt_retransmit_count );
PV::set_pv( PV::PV_BCAST_FAILED_RETR,
dmt_failed_retransmit_count );
dmt_retransmit_count = 0;
dmt_failed_retransmit_count = 0;
}
}
stat_full.tick( );
if ( stat_cycles >= 16 )
{
PV::set_pv( PV::PV_PRDCR_TIME_FULL_MIN_MS,
conv::s_to_ms_int( stat_full.getMin( ) ) );
PV::set_pv( PV::PV_PRDCR_TIME_FULL_MAX_MS,
conv::s_to_ms_int( stat_full.getMax( ) ) );
PV::set_pv( PV::PV_PRDCR_TIME_FULL_MEAN_MS,
conv::s_to_ms_int( stat_full.getMean( ) ) );
PV::set_pv( PV::PV_PRDCR_TIME_RECV_MIN_MS,
conv::s_to_ms_int( stat_recv.getMin( ) ) );
PV::set_pv( PV::PV_PRDCR_TIME_RECV_MAX_MS,
conv::s_to_ms_int( stat_recv.getMax( ) ) );
PV::set_pv( PV::PV_PRDCR_TIME_RECV_MEAN_MS,
conv::s_to_ms_int( stat_recv.getMean( ) ) );
PV::set_pv( PV::PV_PRDCR_CRC_TIME_CRC_MIN_MS,
conv::s_to_ms_int( stat_crc.getMin( ) ) );
PV::set_pv( PV::PV_PRDCR_CRC_TIME_CRC_MAX_MS,
conv::s_to_ms_int( stat_crc.getMax( ) ) );
PV::set_pv( PV::PV_PRDCR_CRC_TIME_CRC_MEAN_MS,
conv::s_to_ms_int( stat_crc.getMean( ) ) );

Jonathan Hanks
committed
PV::set_pv( PV::PV_PRDCR_DATA_CRC, total_crc_obj.result( ) );
stat_full.clearStats( );
stat_crc.clearStats( );
stat_recv.clearStats( );
stat_full.sample( );
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
// printf("gps=%d prev_gps=%d bfrac=%d prev_frac=%d\n", gps, prev_gps,
// frac, prev_frac);
/*const int polls_per_sec = 320; // 320 polls gives 1 millisecond stddev
// of cycle time (AEI Nov 2012)
for (int ntries = 0;; ntries++) {
struct timespec tspec = {
0, 1000000000 / polls_per_sec}; // seconds, nanoseconds
nanosleep(&tspec, NULL);
gps = daqd.symm_gps(&frac);
if (prev_frac == 937500000) {
if (gps == prev_gps + 1) {
frac = 0;
break;
} else {
if (gps > prev_gps + 1) {
fprintf(stderr, "GPS card time jumped from %ld (%ld) "
"to %ld (%ld)\n",
prev_gps, prev_frac, gps, frac);
print(cout);
_exit(1);
} else if (gps < prev_gps) {
fprintf(stderr,
"GPS card time moved back from %ld to %ld\n",
prev_gps, gps);
print(cout);
_exit(1);
}
}
} else if (frac >= prev_frac + 62500000) {
// Check if GPS seconds moved for some reason (because of delay)
if (gps != prev_gps) {
fprintf(stderr, "WARNING: GPS time jumped from %ld (%ld) "
"to %ld (%ld)\n",
prev_gps, prev_frac, gps, frac);
print(cout);
gps = prev_gps;
}
frac = prev_frac + 62500000;
break;
}
if (ntries >= polls_per_sec) {
fprintf(stderr, "Symmetricom GPS timeout\n");
exit(1);
}
}*/
// printf("gps=%d prev_gps=%d ifrac=%d prev_frac=%d\n", gps, prev_gps,
// frac, prev_frac);
controller_cycle++;
prev_controller_cycle = controller_cycle;
prev_gps = gps;
prev_frac = frac;
}
}
/// A main loop for a producer that does crc and data transfer
/// in a seperate thread.
void
producer::frame_writer_crc( shared_work_queue_ptr work_queue )

Jonathan Hanks
committed
int stat_cycles = 0;
stats stat_transfer;

Jonathan Hanks
committed
// PV::set_pv(PV::PV_UPTIME_SECONDS, 0);
// PV::set_pv(PV::PV_GPS, 0);

Jonathan Hanks
committed
// Set thread parameters

Jonathan Hanks
committed
daqd_c::set_thread_priority( "Producer crc",
"dqprodcrc",
PROD_CRC_THREAD_PRIORITY,
PROD_CRC_CPUAFFINITY );

Jonathan Hanks
committed

Jonathan Hanks
committed
// checksum_crc32 crc_obj;

Jonathan Hanks
committed
{

Jonathan Hanks
committed
raii::lock_guard< pthread_mutex_t > lock_{ prod_crc_mutex };
pthread_cond_signal( &prod_crc_cond );

Jonathan Hanks
committed
}

Jonathan Hanks
committed
for ( unsigned long i = 0;; ++i )
{

Jonathan Hanks
committed
unsigned int gps, gps_n;

Jonathan Hanks
committed
producer_buf* cur_buffer =
work_queue->get_from_queue( CRC_THREAD_INPUT );

Jonathan Hanks
committed
circ_buffer_block_prop_t* cur_prop = &( cur_buffer->prop );

Jonathan Hanks
committed
gps = cur_prop->gps;
gps_n = cur_prop->gps_n;

Jonathan Hanks
committed
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
// unsigned char *move_buf = cur_buffer->move_buf;
// stat_full.sample();
// stat_crc.sample();
// // Parse received broadcast transmission header and
// // check config file CRCs and data CRCs, check DCU size and
// number
// // Assign DCU status and cycle.
// unsigned int *header =
// (unsigned int *)(((char *)move_buf) -
// BROADCAST_HEADER_SIZE);
// int ndcu = ntohl(*header++);
// // printf("ndcu = %d\n", ndcu);
// if (ndcu > 0 && ndcu <= MAX_BROADCAST_DCU_NUM) {
// int data_offs = 0; // Offset to the current DCU data
// for (int j = 0; j < ndcu; j++) {
// unsigned int dcu_number;
// unsigned int dcu_size; // Data size for this DCU
// unsigned int config_crc; // Configuration file CRC
// unsigned int dcu_crc; // Data CRC
// unsigned int status; // DCU status word bits (0-ok,
// 0xbad-out of
// // sync, 0x1000-trasm error
// // 0x2000 - configuration mismatch).
// unsigned int cycle; // DCU cycle
// dcu_number = ntohl(*header++);
// dcu_size = ntohl(*header++);
// config_crc = ntohl(*header++);
// dcu_crc = ntohl(*header++);
// status = ntohl(*header++);
// cycle = ntohl(*header++);
// int ifo = 0;
// if (dcu_number > DCU_COUNT) {
// ifo = 1;
// dcu_number -= DCU_COUNT;
// }
// // printf("dcu=%d size=%d config_crc=0x%x crc=0x%x
// status=0x%x
// // cycle=%d\n",
// // dcu_number, dcu_size, config_crc, dcu_crc, status,
// cycle); if (daqd.dcuSize[ifo][dcu_number]) { // Don't
// do anything if
// // this DCU is not
// // configured
// daqd.dcuStatus[ifo][dcu_number] = status;
// daqd.dcuCycle[ifo][dcu_number] = cycle;
// if (status ==
// 0) { // If the DCU status is OK from the
// concentrator
// // Check for local configuration and data
// mismatch if (config_crc !=
// daqd.dcuConfigCRC[ifo][dcu_number]) {
// // Detected local configuration mismach
// daqd.dcuStatus[ifo][dcu_number] |= 0x2000;
// }
// unsigned char *cp =
// move_buf + data_offs; // Start of data
// unsigned int bytes = dcu_size; // DCU data
// size crc_obj.add(cp, bytes);
//// unsigned int crc = 0;
//// // Calculate DCU data CRC
//// while (bytes--) {
//// crc = (crc << 8) ^
//// crctab[((crc >> 24) ^ *(cp++)) &
/// 0xFF]; / } / bytes =
/// dcu_size; / while (bytes > 0) { / crc = (crc

Jonathan Hanks
committed
///<< 8) ^ / crctab[((crc >> 24) ^
/// bytes) & 0xFF]; / bytes >>= 8; / } / crc
/// = ~crc & 0xFFFFFFFF;

Jonathan Hanks
committed
// unsigned int crc = crc_obj.result();
// if (crc != dcu_crc) {
// // Detected data corruption !!!
// daqd.dcuStatus[ifo][dcu_number] |= 0x1000;
// DEBUG1(printf(
// "ifo=%d dcu=%d calc_crc=0x%x
// data_crc=0x%x\n", ifo, dcu_number,
// crc, dcu_crc));
// }
// crc_obj.reset();
// }
// }
// data_offs += dcu_size;
// }
// }
// stat_crc.tick();
//
// // :TODO: make sure all DCUs configuration matches; restart
// when the
// // mismatch detected