Skip to content
Snippets Groups Projects
producer_shmem.cc 40.1 KiB
Newer Older
Jonathan Hanks's avatar
Jonathan Hanks committed

#define _XOPEN_SOURCE 600
#define _BSD_SOURCE 1
#define _DEFAULT_SOURCE 1

#include <config.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <sys/time.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h>
#include <sys/timeb.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include <algorithm>
#include <array>
#include <cctype>
#include <numeric>
Jonathan Hanks's avatar
Jonathan Hanks committed
#include <sys/prctl.h>

#ifdef USE_SYMMETRICOM
#ifndef USE_IOP
//#include <bcuser.h>
#include <../drv/gpstime/gpstime.h>
#endif
#endif

using namespace std;

#include "circ.hh"
#include "daqd.hh"
#include "sing_list.hh"
#include "drv/cdsHardware.h"
#include <netdb.h>
#include "net_writer.hh"
#include "drv/cdsHardware.h"

#include <sys/ioctl.h>

Jonathan Hanks's avatar
Jonathan Hanks committed
#include "epics_pvs.hh"

#include "raii.hh"
#include "conv.hh"
#include "circ.h"
#include "daq_core.h"
#include "daqd_thread.hh"
Jonathan Hanks's avatar
Jonathan Hanks committed

extern daqd_c       daqd;
extern int          shutdown_server( );
extern unsigned int crctab[ 256 ];
Jonathan Hanks's avatar
Jonathan Hanks committed

extern long int altzone;

struct ToLower
{
    char
    operator( )( char c ) const
    {
        return std::tolower( c );
    }
Jonathan Hanks's avatar
Jonathan Hanks committed
};

/*!
 * @brief return a pointer to the first dcu_msg_header_t in the
 * daq_multi_dcu_header_t structure
 * @param headers the headers structure
 * @return pointer to the first message header.
 * @note added so that standard algorithms could be used with the set of
 * populated dcu headers in a block
 */
inline daq_msg_header_t*
begin( daq_multi_dcu_header_t& headers )
{
    return headers.dcuheader;
}

/*!
 * @brief return a pointer just past the last used dcu_msg_header_t in the
 * daq_multi_dcu_header_t structure
 * @param headers the headers structure
 * @return pointer just passed the last used message header (as denoted by the
 * dcuTotalModels field)
 * @note added so that standard algorithms could be used with the set of
 * populated dcu headers in a block
 * @note this does not check for an invalid dcuTotalModels value
 */
inline daq_msg_header_t*
end( daq_multi_dcu_header_t& headers )
{
    return begin( headers ) + headers.dcuTotalModels;
}

Jonathan Hanks's avatar
Jonathan Hanks committed
/* GM and shared memory communication area */

/* This may still be needed for test points */
// struct rmIpcStr gmDaqIpc[DCU_COUNT];
Jonathan Hanks's avatar
Jonathan Hanks committed
/// DMA memory area pointers
int controller_cycle = 0;

/// Pointer to GDS TP tables
struct cdsDaqNetGdsTpNum* gdsTpNum[ 2 ][ DCU_COUNT ];
Jonathan Hanks's avatar
Jonathan Hanks committed

/// The main data movement thread (the producer)
    unsigned char* read_dest;
    // circ_buffer_block_prop_t prop;
Jonathan Hanks's avatar
Jonathan Hanks committed

    unsigned long prev_gps, prev_frac;
    unsigned long gps, frac;

    checksum_crc32 total_crc_obj;
    // last status value
    std::array< bool, DCU_COUNT > dcuSeenLastCycle{};
    std::fill( dcuSeenLastCycle.begin( ), dcuSeenLastCycle.end( ), false );
Jonathan Hanks's avatar
Jonathan Hanks committed
    // error message buffer
Jonathan Hanks's avatar
Jonathan Hanks committed
    unsigned long stat_cycles = 0;
    stats         stat_full, stat_recv, stat_crc, stat_transfer;
Jonathan Hanks's avatar
Jonathan Hanks committed

    // Set thread parameters
    daqd_c::set_thread_priority(
        "Producer", "dqprod", PROD_THREAD_PRIORITY, PROD_CPUAFFINITY );
Jonathan Hanks's avatar
Jonathan Hanks committed

    auto work_queue = std::make_shared< work_queue_t >( );
    work_queue::aborter< work_queue_t > queue_closer_( *work_queue );
    for ( int i = 0; i < PRODUCER_WORK_QUEUE_BUF_COUNT; ++i )
    {
        std::unique_ptr< producer_buf > buf_( new producer_buf );

        buf_->move_buf = nullptr;
        buf_->vmic_pv_len = 0;

        daqd.initialize_vmpic( &( buf_->move_buf ),
                               &( buf_->vmic_pv_len ),
                               buf_->vmic_pv,
                               &( buf_->dcu_move_addresses ) );
        raii::array_ptr< unsigned char > mbuf_( buf_->move_buf );
        work_queue->add_to_queue( PRODUCER_WORK_QUEUE_START, buf_.get( ) );
        DEBUG( 4, cerr << "Starting producer CRC thread" << endl );
        pthread_attr_init( &attr );
        pthread_attr_setstacksize( &attr, daqd.thread_stack_size );
        pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM );

        raii::lock_guard< pthread_mutex_t > crc_sync( prod_crc_mutex );
        int err = launch_pthread( crc_tid, attr, [this, work_queue]( ) mutable {
            this->frame_writer_crc( std::move( work_queue ) );
        } );

        if ( err )
        {
            pthread_attr_destroy( &attr );
            system_log( 1,
                        "pthread_create() err=%d while creating producer debug "
                        "crc thread",
                        err );
            exit( 1 );
Jonathan Hanks's avatar
Jonathan Hanks committed

        pthread_cond_wait( &prod_crc_cond, &prod_crc_mutex );
        DEBUG( 5, cerr << "producer threads synced" << endl );
    //    unsigned char*                      move_buf = nullptr;
    //    int                                 vmic_pv_len = 0;
    //    raii::array_ptr< struct put_dpvec > _vmic_pv(
    //        new struct put_dpvec[ MAX_CHANNELS ] );
    //    struct put_dpvec* vmic_pv = _vmic_pv.get( );
    //
    //
    //    // use the offsets calculated by initialize_vmpic
    //    // for the start of the dcu's
    //    dcu_move_address dcu_move_addresses;
    //
    //    // FIXME: move_buf could leak on errors (but we would probably die
    //    anyways. daqd.initialize_vmpic(
    //        &move_buf, &vmic_pv_len, vmic_pv, &dcu_move_addresses );
    //    raii::array_ptr< unsigned char > _move_buf( move_buf );
Jonathan Hanks's avatar
Jonathan Hanks committed

    // Allocate local test point tables
    static struct cdsDaqNetGdsTpNum gds_tp_table[ 2 ][ DCU_COUNT ];
Jonathan Hanks's avatar
Jonathan Hanks committed

    for ( int ifo = 0; ifo < daqd.data_feeds; ifo++ )
    {
        for ( int j = DCU_ID_EDCU; j < DCU_COUNT; j++ )
        {
            if ( daqd.dcuSize[ ifo ][ j ] == 0 )
Jonathan Hanks's avatar
Jonathan Hanks committed
                continue; // skip unconfigured DCU nodes
            if ( IS_MYRINET_DCU( j ) )
            {
                gdsTpNum[ ifo ][ j ] = gds_tp_table[ ifo ] + j;
            }
            else
            {
                gdsTpNum[ ifo ][ j ] = 0;
    for ( int j = 0; j < DCU_COUNT; j++ )
    {
Jonathan Hanks's avatar
Jonathan Hanks committed
        class stats s;
    ShMemReceiver shmem_receiver(
Jonathan Hanks's avatar
Jonathan Hanks committed
        daqd.parameters( ).get( "shmem_input", "local_dc" ),
        daqd.parameters( ).get< size_t >( "shmem_size", 104857600 ),
        []( ) { PV::set_pv( PV::PV_PRDCR_NOT_STALLED, 0 ); } );
Jonathan Hanks's avatar
Jonathan Hanks committed

Jonathan Hanks's avatar
Jonathan Hanks committed

Jonathan Hanks's avatar
Jonathan Hanks committed

    // No waiting here if compiled as broadcasts receiver

    int cycle_delay = daqd.cycle_delay;

    // Wait until a second ends, so that the next data sould
    // come in on cycle 0
    // use the data as the clock here
    int sync_tries = 0;
Jonathan Hanks's avatar
Jonathan Hanks committed
        const int max_sync_tries = 10 * DATA_BLOCKS;

        daq_dc_data_t* block = shmem_receiver.receive_data( );
Jonathan Hanks's avatar
Jonathan Hanks committed
        ++sync_tries;
        if ( block->header.dcuTotalModels == 0 )
Jonathan Hanks's avatar
Jonathan Hanks committed
            continue;
        gps = block->header.dcuheader[ 0 ].timeSec;
        frac = block->header.dcuheader[ 0 ].timeNSec;
        std::cerr << block->header.dcuheader[ 0 ].timeNSec << std::endl;
        // as of 8 Nov 2017 zmq_multi_stream sends the gps nanoseconds as a
        // cycle number
        if ( frac == DATA_BLOCKS - 1 || frac >= 937500000 )
Jonathan Hanks's avatar
Jonathan Hanks committed
            break;
        if ( sync_tries > max_sync_tries )
        {
            std::cerr << "Unable to sync up to front ends after " << sync_tries
                      << " attempts" << std::endl;
            exit( 1 );
Jonathan Hanks's avatar
Jonathan Hanks committed
        }
    }
    prev_gps = gps;
    prev_frac = frac;

    PV::set_pv( PV::PV_UPTIME_SECONDS, 0 );
    PV::set_pv( PV::PV_GPS, 0 );
Jonathan Hanks's avatar
Jonathan Hanks committed

    std::array< std::uint8_t, DCU_COUNT > dcu_recv_mask{};
    std::uint32_t                         dcu_recv_count = 0;
    std::uint64_t                         data_in_kb = 0;
    std::uint64_t                         data_regular_in_kb = 0;
    std::uint64_t                         data_tp_in_kb = 0;
    PV::set_pv( PV::PV_PRDCR_UNIQUE_DCU_REPORTED_PER_S, 0 );
    PV::set_pv( PV::PV_PRDCR_TOTAL_DCU_REPORTED_PER_S, 0 );
    PV::set_pv( PV::PV_PRDCR_TOTAL_DATA_RATE_KB_PER_S, 0 );
    PV::set_pv( PV::PV_PRDCR_TP_DATA_RATE_KB_PER_S, 0 );
    PV::set_pv( PV::PV_PRDCR_MODEL_DATA_RATE_KB_PER_S, 0 );

Jonathan Hanks's avatar
Jonathan Hanks committed
    int prev_controller_cycle = -1;
    int dcu_cycle = 0;
    int resync = 0;

    if ( daqd.dcu_status_check & 4 )
Jonathan Hanks's avatar
Jonathan Hanks committed
        resync = 1;

    std::array< int, DCU_COUNT >          dcu_to_zmq_lookup{};
    std::array< char*, DCU_COUNT >        dcu_data_from_zmq{};
    std::array< unsigned int, DCU_COUNT > dcu_data_crc{};
    std::array< unsigned int, DCU_COUNT > dcu_data_gps{};

    for ( unsigned long i = 0;; i++ )
    { // timing
        tick( ); // measure statistics
Jonathan Hanks's avatar
Jonathan Hanks committed

        // DEBUG(6, printf("Timing %d gps=%d frac=%d\n", i, gps, frac));

        std::fill( dcu_to_zmq_lookup.begin( ), dcu_to_zmq_lookup.end( ), -1 );
        std::fill(
            dcu_data_from_zmq.begin( ), dcu_data_from_zmq.end( ), (char*)0 );
        std::fill( dcu_data_crc.begin( ), dcu_data_crc.end( ), 0 );
        std::fill( dcu_data_gps.begin( ), dcu_data_gps.end( ), 0 );
Jonathan Hanks's avatar
Jonathan Hanks committed
        // retreive 1/16s of data from zmq
        stat_recv.sample( );
        daq_dc_data_t* data_block = shmem_receiver.receive_data( );
        stat_recv.tick( );
        PV::set_pv( PV::PV_PRDCR_NOT_STALLED, 1 );
        // clear the total crc
        total_crc_obj.reset( );

        if ( i % 16 == 0 )
        {
            unsigned int tp_count = 0;
            for ( const auto& dcu_header : data_block->header )
            {
                tp_count += dcu_header.tpCount;
            }
            PV::set_pv( PV::PV_PRDCR_OPEN_TP_COUNT, tp_count );
            PV::set_pv( PV::PV_PRDCR_UNIQUE_DCU_REPORTED_PER_S,
                        std::accumulate( dcu_recv_mask.begin( ),
                                         dcu_recv_mask.end( ),
                                         static_cast< std::uint32_t >( 0 ) ) );
            PV::set_pv( PV::PV_PRDCR_TOTAL_DCU_REPORTED_PER_S, dcu_recv_count );
            PV::set_pv( PV::PV_PRDCR_TOTAL_DATA_RATE_KB_PER_S,
                        data_in_kb / 1024 );
            PV::set_pv( PV::PV_PRDCR_TP_DATA_RATE_KB_PER_S,
                        data_tp_in_kb / 1024 );
            PV::set_pv( PV::PV_PRDCR_MODEL_DATA_RATE_KB_PER_S,
                        data_regular_in_kb / 1024 );
            std::fill( dcu_recv_mask.begin( ), dcu_recv_mask.end( ), 0 );
            dcu_recv_count = 0;
            data_in_kb = 0;
            data_tp_in_kb = 0;
            data_regular_in_kb = 0;
        }
        data_in_kb += data_block->header.fullDataBlockSize;
        dcu_recv_count += data_block->header.dcuTotalModels;
        for ( const auto& dcu_header : data_block->header )
        {
            dcu_header.tpCount;
            dcu_recv_mask[ dcu_header.dcuId ] = 1;
            data_tp_in_kb += dcu_header.tpBlockSize;
            data_regular_in_kb += dcu_header.dataBlockSize;
        }

            work_queue->get_from_queue( RECV_THREAD_INPUT );
        circ_buffer_block_prop_t* cur_prop = &cur_buffer->prop;

        if ( data_block->header.dcuTotalModels > 0 )
        {
            gps = data_block->header.dcuheader[ 0 ].timeSec;
            frac = data_block->header.dcuheader[ 0 ].timeNSec;
                for ( int i = 0; i < data_block->header.dcuTotalModels; ++i )
Jonathan Hanks's avatar
Jonathan Hanks committed
                {
                    if ( data_block->header.dcuheader[ i ].dataBlockSize == 0 )
                        std::cerr << "block " << i << " has 0 bytes"
                                  << std::endl;
                for ( int j = 0; j < data_block->header.dcuTotalModels; ++j )
                    int dcuid = data_block->header.dcuheader[ j ].dcuId;
                    gds_tp_table[ 0 ][ dcuid ].count =
                        data_block->header.dcuheader[ j ].tpCount;
                    std::copy( &data_block->header.dcuheader[ j ].tpNum[ 0 ],
                               &data_block->header.dcuheader[ j ].tpNum[ 256 ],
                               &gds_tp_table[ 0 ][ dcuid ].tpNum[ 0 ] );
Jonathan Hanks's avatar
Jonathan Hanks committed
            }
            if ( prev_frac == DATA_BLOCKS - 1 || prev_frac >= 937500000 )
            auto expected_nano = prev_frac + ( 1000000000 / 16 );
            if ( expected_nano >= 1000000000 )
                expected_nano = 0;
            }
            auto expected_cycle = DATA_BLOCKS + 10;
            if ( prev_frac < DATA_BLOCKS )
                expected_cycle = ( prev_frac + 1 ) % DATA_BLOCKS;
            }
            if ( data_block->header.dcuTotalModels == 0 ||
                 gps != expected_gps ||
                 ( frac != expected_cycle && frac != expected_nano ) )
            {
                fprintf(
                    stderr,
                    "Dropped data from shmem or received 0 dcus; gps now = "
                    "%d, %d; was = %d, %d; dcu count = %d\n",
                    gps,
                    frac,
                    (int)prev_gps,
                    (int)prev_frac,
                    (int)( data_block->header.dcuTotalModels ) );

                fprintf( stderr, "\texpected gps = %d\n", expected_gps );
                fprintf( stderr,
                         "\texpected cycle = %d\n\texpected nano = %d\n\n",
                         expected_cycle,
                         expected_cycle );
Jonathan Hanks's avatar
Jonathan Hanks committed
            }
Jonathan Hanks's avatar
Jonathan Hanks committed

        // map out the order of the dcuids in the zmq data, this could change
        // with each data block
        {
            int   total_zmq_models = data_block->header.dcuTotalModels;
            char* cur_dcu_zmq_ptr = data_block->dataBlock;
            for ( int cur_block = 0; cur_block < total_zmq_models; ++cur_block )
            {
                unsigned int cur_dcuid =
                    data_block->header.dcuheader[ cur_block ].dcuId;
                dcu_to_zmq_lookup[ cur_dcuid ] = cur_block;
                dcu_data_from_zmq[ cur_dcuid ] = cur_dcu_zmq_ptr;
                dcu_data_crc[ cur_dcuid ] =
                    data_block->header.dcuheader[ cur_block ].dataCrc;
                dcu_data_gps[ cur_dcuid ] =
                    data_block->header.dcuheader[ cur_block ].timeSec;
                cur_dcu_zmq_ptr +=
                    data_block->header.dcuheader[ cur_block ].dataBlockSize +
                    data_block->header.dcuheader[ cur_block ].tpBlockSize;
        for ( int j = DCU_ID_EDCU; j < DCU_COUNT; j++ )
        {
Jonathan Hanks's avatar
Jonathan Hanks committed
            // printf("DCU %d is %d bytes long\n", j, daqd.dcuSize[0][j]);
            if ( daqd.dcuSize[ 0 ][ j ] == 0 || dcu_to_zmq_lookup[ j ] < 0 ||
                 dcu_data_from_zmq[ j ] == (void*)0 )
                bool seenLastCycle = dcuSeenLastCycle[ j ];
                if ( seenLastCycle )
                    daqd.dcuCrcErrCntPerSecond[ 0 ][ j ]++;
                    daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ]++;
                    daqd.dcuCrcErrCnt[ 0 ][ j ]++;
                dcuSeenLastCycle[ j ] = false;
                daqd.dcuStatus[ 0 ][ j ] = 0xbad;
Jonathan Hanks's avatar
Jonathan Hanks committed
                continue; // skip unconfigured DCU nodes
            dcuSeenLastCycle[ j ] = true;
            read_dest = cur_buffer->dcu_move_addresses.start[ j ];
            long read_size = daqd.dcuDAQsize[ 0 ][ j ];
            if ( IS_MYRINET_DCU( j ) )
            {
Jonathan Hanks's avatar
Jonathan Hanks committed
                dcu_cycle = i % DAQ_NUM_DATA_BLOCKS;

                // dcu_cycle = gmDaqIpc[j].cycle;
                // printf("cycl=%d ctrl=%d dcu=%d\n", gmDaqIpc[j].cycle,
                // controller_cycle, j);
                // Get the data from myrinet
                // Get the data from the buffers returned by the zmq receiver
                int               zmq_index = dcu_to_zmq_lookup[ j ];
                daq_msg_header_t& cur_dcu =
                    data_block->header.dcuheader[ zmq_index ];
                if ( read_size != cur_dcu.dataBlockSize )
                {
                    std::cerr << "read_size = " << read_size << " cur dcu size "
                              << cur_dcu.dataBlockSize << std::endl;
                // assert(read_size == cur_dcu.dataBlockSize);
                memcpy( (void*)read_dest,
                        dcu_data_from_zmq[ j ],
                        cur_dcu.dataBlockSize );
                // copy test points over
                int max_tp_size =
                    2 * DAQ_DCU_BLOCK_SIZE - cur_dcu.dataBlockSize;
                int tp_size =
                    ( cur_dcu.tpBlockSize <= max_tp_size ? cur_dcu.tpBlockSize
                                                         : max_tp_size );
                memcpy( (void*)( read_dest + cur_dcu.dataBlockSize ),
                        (void*)( ( (char*)dcu_data_from_zmq[ j ] ) +
                                 cur_dcu.dataBlockSize ),
                        tp_size );
                int tp_off = reinterpret_cast< char* >( cur_buffer->move_buf ) -
                    reinterpret_cast< char* >( read_dest ) +
                    cur_dcu.dataBlockSize;

                int              cblk1 = ( i + 1 ) % DAQ_NUM_DATA_BLOCKS;
Jonathan Hanks's avatar
Jonathan Hanks committed
                static const int ifo = 0; // For now

                // Calculate DCU status, if needed
                if ( daqd.dcu_status_check & ( 1 << ifo ) )
                {
                    if ( cblk1 % 16 == 0 )
                    {
                        int lastStatus = dcuStatus[ ifo ][ j ];
Jonathan Hanks's avatar
Jonathan Hanks committed

                        /* Check if DCU running at all */
                        if ( dcuStatCycle[ ifo ][ j ] == 0 )
                            dcuStatus[ ifo ][ j ] = DAQ_STATE_SYNC_ERR;
                            dcuStatus[ ifo ][ j ] = DAQ_STATE_RUN;
Jonathan Hanks's avatar
Jonathan Hanks committed
                        // dcuCycleStatus shows how many matches of cycle number
                        // we got
                        DEBUG( 4,
                               cerr << "dcuid=" << j << " dcuCycleStatus="
                                    << dcuCycleStatus[ ifo ][ j ]
                                    << " dcuStatCycle="
                                    << dcuStatCycle[ ifo ][ j ] << endl );
Jonathan Hanks's avatar
Jonathan Hanks committed

                        /* Check if DCU running and in sync */
                        if ( ( dcuCycleStatus[ ifo ][ j ] > 3 || j < 5 ) &&
                             dcuStatCycle[ ifo ][ j ] > 4 )
                        {
                            dcuStatus[ ifo ][ j ] = DAQ_STATE_RUN;
                        if ( /* (lastStatus == DAQ_STATE_RUN) && */ (
                            dcuStatus[ ifo ][ j ] != DAQ_STATE_RUN ) )
                        {
                            DEBUG( 4,
                                   cerr << "Lost " << daqd.dcuName[ j ]
                                        << "(ifo " << ifo << "; dcu " << j
                                        << "); status "
                                        << dcuCycleStatus[ ifo ][ j ]
                                        << dcuStatCycle[ ifo ][ j ] << endl );
                            std::cout << "Lost " << daqd.dcuName[ j ] << "(ifo "
                                      << ifo << "; dcu " << j << "); status "
                                      << dcuCycleStatus[ ifo ][ j ]
                                      << dcuStatCycle[ ifo ][ j ] << endl;
Jonathan Hanks's avatar
Jonathan Hanks committed
                            cur_dcu.status = DAQ_STATE_FAULT;
                        }

                        if ((dcuStatus[ifo][j] ==
                             DAQ_STATE_RUN) /* && (lastStatus != DAQ_STATE_RUN) */)
                        {
                            DEBUG( 4,
                                   cerr << "New " << daqd.dcuName[ j ]
                                        << " (dcu " << j << ")" << endl );
Jonathan Hanks's avatar
Jonathan Hanks committed
                            cur_dcu.status = DAQ_STATE_RUN;
                        }

                        dcuCycleStatus[ ifo ][ j ] = 0;
                        dcuStatCycle[ ifo ][ j ] = 0;
Jonathan Hanks's avatar
Jonathan Hanks committed
                        cur_dcu.status = cur_dcu.status;
                    }

                    {
                        int intCycle = cur_dcu.cycle % DAQ_NUM_DATA_BLOCKS;
                        if ( intCycle != dcuLastCycle[ ifo ][ j ] )
                            dcuStatCycle[ ifo ][ j ]++;
                        dcuLastCycle[ ifo ][ j ] = intCycle;
Jonathan Hanks's avatar
Jonathan Hanks committed
                    }
                }

                // Update DCU status
                int newStatus = cur_dcu.status != DAQ_STATE_RUN ? 0xbad : 0;
                DEBUG( 4,
                       std::cout << "newStatus = " << ( hex ) << newStatus
                                 << " cur_dcu.status = " << ( dec )
                                 << cur_dcu.status;
                       std::cout << " gps = " << cur_dcu.timeSec << " gps_n = "
                                 << cur_dcu.timeNSec << std::endl; );
Jonathan Hanks's avatar
Jonathan Hanks committed
                int newCrc = cur_dcu.fileCrc;

                // printf("%x\n", *((int *)read_dest));
                if ( !IS_EXC_DCU( j ) )
                {
                    if ( newCrc != daqd.dcuConfigCRC[ 0 ][ j ] )
                    {
Jonathan Hanks's avatar
Jonathan Hanks committed
                        newStatus |= 0x2000;
                        DEBUG( 4,
                               std::cout << "config crc mismatch expecting "
                                         << std::hex
                                         << daqd.dcuConfigCRC[ 0 ][ j ]
                                         << " got " << std::hex << newCrc
                                         << std::endl; );
Jonathan Hanks's avatar
Jonathan Hanks committed
                    }
                }
                if ( newStatus != daqd.dcuStatus[ 0 ][ j ] )
                {
Jonathan Hanks's avatar
Jonathan Hanks committed
                    // system_log(1, "DCU %d IFO %d (%s) %s", j, 0,
                    // daqd.dcuName[j], newStatus? "fault": "running");
Jonathan Hanks's avatar
Jonathan Hanks committed
                        // system_log(1, "DCU %d IFO %d (%s) reconfigured (crc
                        // 0x%x rfm 0x%x)", j, 0, daqd.dcuName[j],
                        // daqd.dcuConfigCRC[0][j], newCrc);
                    }
                }
                daqd.dcuStatus[ 0 ][ j ] = newStatus;
Jonathan Hanks's avatar
Jonathan Hanks committed

                daqd.dcuCycle[ 0 ][ j ] = cur_dcu.cycle;
Jonathan Hanks's avatar
Jonathan Hanks committed

                /* Check DCU data checksum */
                unsigned long  bytes = read_size;
                unsigned char* cp = (unsigned char*)read_dest;
                total_crc_obj.add( &crc, sizeof( crc ) );
Jonathan Hanks's avatar
Jonathan Hanks committed
                int cblk = i % 16;
                // Reset CRC/second variable for this DCU
                if ( cblk == 0 )
                {
                    daqd.dcuCrcErrCntPerSecond[ 0 ][ j ] =
                        daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ];
                    daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ] = 0;
                read_dest += 2 * DAQ_DCU_BLOCK_SIZE; // cur_dcu.dataBlockSize;

                if ( j >= DCU_ID_ADCU_1 && ( !IS_TP_DCU( j ) ) &&
                     daqd.dcuStatus[ 0 ][ j ] == 0 )
                {
Jonathan Hanks's avatar
Jonathan Hanks committed

                    unsigned int rfm_crc =
                        dcu_data_crc[ j ]; // gmDaqIpc[j].bp[cblk].crc;
                    unsigned int dcu_gps =
                        dcu_data_gps[ j ]; // gmDaqIpc[j].bp[cblk].timeSec;
Jonathan Hanks's avatar
Jonathan Hanks committed

                    // system_log(5, "dcu %d block %d cycle %d  gps %d symm
                    // %d\n", j, cblk, gmDaqIpc[j].bp[cblk].cycle,  dcu_gps,
                    // gps);
                    unsigned long mygps = gps;
                    // if (cblk > (15 - cycle_delay))
Jonathan Hanks's avatar
Jonathan Hanks committed
                    //    mygps--;

                    if ( dcu_gps != mygps )
                    {
                        daqd.dcuStatus[ 0 ][ j ] |= 0x4000;
                        system_log(
                            5,
                            "GPS MISS dcu %d (%s); dcu_gps=%d gps=%ld\n",
                            j,
                            daqd.dcuName[ j ],
                            dcu_gps,
                            mygps );
Jonathan Hanks's avatar
Jonathan Hanks committed
                        system_log(
                            5,
                            "MISS dcu %d (%s); crc[%d]=%x; computed crc=%lx\n",
Jonathan Hanks's avatar
Jonathan Hanks committed

                        /* Set DCU status to BAD, all data will be marked as BAD
                           because of the CRC mismatch */
                        daqd.dcuStatus[ 0 ][ j ] |= 0x1000;
Jonathan Hanks's avatar
Jonathan Hanks committed
                    }
                    else
                    {
                        system_log( 10,
                                    " MATCH dcu %d (%s); crc[%d]=%x; "
                                    "computed crc=%lx\n",
                                    j,
                                    daqd.dcuName[ j ],
                                    cblk,
                                    rfm_crc,
                                    crc );
                    }
                    if ( daqd.dcuStatus[ 0 ][ j ] )
                    {
                        daqd.dcuCrcErrCnt[ 0 ][ j ]++;
                        daqd.dcuCrcErrCntPerSecondRunning[ 0 ][ j ]++;
Jonathan Hanks's avatar
Jonathan Hanks committed
        int cblk = i % 16;

        // Assign per-DCU data we need to broadcast out
        //
        for ( int ifo = 0; ifo < daqd.data_feeds; ifo++ )
        {
            for ( int j = DCU_ID_EDCU; j < DCU_COUNT; j++ )
            {
                if ( IS_TP_DCU( j ) )
Jonathan Hanks's avatar
Jonathan Hanks committed
                    continue; // Skip TP and EXC DCUs
                if ( daqd.dcuSize[ ifo ][ j ] == 0 )
Jonathan Hanks's avatar
Jonathan Hanks committed
                    continue; // Skip unconfigured DCUs
                cur_prop->dcu_data[ j + ifo * DCU_COUNT ].cycle =
                    daqd.dcuCycle[ ifo ][ j ];
                volatile struct rmIpcStr* ipc = daqd.dcuIpc[ ifo ][ j ];
Jonathan Hanks's avatar
Jonathan Hanks committed

                // Do not support Myrinet DCUs on H2
                if ( IS_MYRINET_DCU( j ) && ifo == 0 )
                {
Jonathan Hanks's avatar
Jonathan Hanks committed

                        dcu_data_crc[ j ]; // gmDaqIpc[j].bp[cblk].crc;
Jonathan Hanks's avatar
Jonathan Hanks committed

                    // printf("dcu %d crc=0x%x\n", j, prop.dcu_data[j].crc);
                    // Remove 0x8000 status from propagating to the broadcast
                    // receivers
                        daqd.dcuStatus[ 0 /* IFO */ ][ j ] & ~0x8000;
                }
                else
                {
                    cur_prop->dcu_data[ j + ifo * DCU_COUNT ].crc =
                    cur_prop->dcu_data[ j + ifo * DCU_COUNT ].status =
Jonathan Hanks's avatar
Jonathan Hanks committed
                }
            }
        }

        // prop.gps = time(0) - 315964819 + 33;

        // if (cblk > (15 - cycle_delay))
Jonathan Hanks's avatar
Jonathan Hanks committed
        //    prop.gps--;

        cur_prop->gps_n = 1000000000 / 16 * ( i % 16 );
        // printf("before put %d %d %d\n", prop.gps, prop.gps_n, frac);
Jonathan Hanks's avatar
Jonathan Hanks committed

        // std::cout << "about to call put16th_dpscattered with " << vmic_pv_len
        // << " entries. prop.gps = " << prop.gps << " prop.gps_n = " <<
        // prop.gps_n << "\n"; for (int ii = 0; ii < vmic_pv_len; ++ii)
Jonathan Hanks's avatar
Jonathan Hanks committed
        //    std::cout << " " << *vmic_pv[ii].src_status_addr;
Jonathan Hanks's avatar
Jonathan Hanks committed

        work_queue->add_to_queue( RECV_THREAD_OUTPUT, cur_buffer );
Jonathan Hanks's avatar
Jonathan Hanks committed

        PV::set_pv( PV::PV_CYCLE, i );
Jonathan Hanks's avatar
Jonathan Hanks committed
        // DEBUG1(cerr << "gps=" << PV::pv(PV::PV_GPS) << endl);
Jonathan Hanks's avatar
Jonathan Hanks committed
            // Count how many seconds we were acquiring data
            PV::pv( PV::PV_UPTIME_SECONDS )++;
Jonathan Hanks's avatar
Jonathan Hanks committed

            {
                extern unsigned long dmt_retransmit_count;
                extern unsigned long dmt_failed_retransmit_count;
                // Display DMT retransmit channels every second
                PV::set_pv( PV::PV_BCAST_RETR, dmt_retransmit_count );
                PV::set_pv( PV::PV_BCAST_FAILED_RETR,
                            dmt_failed_retransmit_count );
Jonathan Hanks's avatar
Jonathan Hanks committed
                dmt_retransmit_count = 0;
                dmt_failed_retransmit_count = 0;
            }
        }

Jonathan Hanks's avatar
Jonathan Hanks committed

        ++stat_cycles;
        if ( stat_cycles >= 16 )
        {
            PV::set_pv( PV::PV_PRDCR_TIME_FULL_MIN_MS,
                        conv::s_to_ms_int( stat_full.getMin( ) ) );
            PV::set_pv( PV::PV_PRDCR_TIME_FULL_MAX_MS,
                        conv::s_to_ms_int( stat_full.getMax( ) ) );
            PV::set_pv( PV::PV_PRDCR_TIME_FULL_MEAN_MS,
                        conv::s_to_ms_int( stat_full.getMean( ) ) );

            PV::set_pv( PV::PV_PRDCR_TIME_RECV_MIN_MS,
                        conv::s_to_ms_int( stat_recv.getMin( ) ) );
            PV::set_pv( PV::PV_PRDCR_TIME_RECV_MAX_MS,
                        conv::s_to_ms_int( stat_recv.getMax( ) ) );
            PV::set_pv( PV::PV_PRDCR_TIME_RECV_MEAN_MS,
                        conv::s_to_ms_int( stat_recv.getMean( ) ) );

            PV::set_pv( PV::PV_PRDCR_CRC_TIME_CRC_MIN_MS,
                        conv::s_to_ms_int( stat_crc.getMin( ) ) );
            PV::set_pv( PV::PV_PRDCR_CRC_TIME_CRC_MAX_MS,
                        conv::s_to_ms_int( stat_crc.getMax( ) ) );
            PV::set_pv( PV::PV_PRDCR_CRC_TIME_CRC_MEAN_MS,
                        conv::s_to_ms_int( stat_crc.getMean( ) ) );
            PV::set_pv( PV::PV_PRDCR_DATA_CRC, total_crc_obj.result( ) );

            stat_full.clearStats( );
            stat_crc.clearStats( );
            stat_recv.clearStats( );
Jonathan Hanks's avatar
Jonathan Hanks committed
            stat_cycles = 0;
        }

Jonathan Hanks's avatar
Jonathan Hanks committed

        // printf("gps=%d  prev_gps=%d bfrac=%d prev_frac=%d\n", gps, prev_gps,
        // frac, prev_frac);
        /*const int polls_per_sec = 320; // 320 polls gives 1 millisecond stddev
                                       // of cycle time (AEI Nov 2012)
        for (int ntries = 0;; ntries++) {
            struct timespec tspec = {
                0, 1000000000 / polls_per_sec}; // seconds, nanoseconds
            nanosleep(&tspec, NULL);

            gps = daqd.symm_gps(&frac);

            if (prev_frac == 937500000) {
                if (gps == prev_gps + 1) {
                    frac = 0;
                    break;
                } else {
                    if (gps > prev_gps + 1) {
                        fprintf(stderr, "GPS card time jumped from %ld (%ld) "
                                        "to %ld (%ld)\n",
                                prev_gps, prev_frac, gps, frac);
                        print(cout);
                        _exit(1);
                    } else if (gps < prev_gps) {
                        fprintf(stderr,
                                "GPS card time moved back from %ld to %ld\n",
                                prev_gps, gps);
                        print(cout);
                        _exit(1);
                    }
                }
            } else if (frac >= prev_frac + 62500000) {
                // Check if GPS seconds moved for some reason (because of delay)
                if (gps != prev_gps) {
                    fprintf(stderr, "WARNING: GPS time jumped from %ld (%ld) "
                                    "to %ld (%ld)\n",
                            prev_gps, prev_frac, gps, frac);
                    print(cout);
                    gps = prev_gps;
                }
                frac = prev_frac + 62500000;
                break;
            }

            if (ntries >= polls_per_sec) {

                fprintf(stderr, "Symmetricom GPS timeout\n");

                exit(1);
            }
        }*/
        // printf("gps=%d prev_gps=%d ifrac=%d prev_frac=%d\n", gps,  prev_gps,
        // frac, prev_frac);
        controller_cycle++;

        prev_controller_cycle = controller_cycle;

        prev_gps = gps;
        prev_frac = frac;
    }
}

/// A main loop for a producer that does crc  and data transfer
/// in a seperate thread.
void
producer::frame_writer_crc( shared_work_queue_ptr work_queue )
    //    PV::set_pv(PV::PV_UPTIME_SECONDS, 0);
    //    PV::set_pv(PV::PV_GPS, 0);
    daqd_c::set_thread_priority( "Producer crc",
                                 "dqprodcrc",
                                 PROD_CRC_THREAD_PRIORITY,
                                 PROD_CRC_CPUAFFINITY );
        raii::lock_guard< pthread_mutex_t > lock_{ prod_crc_mutex };
        pthread_cond_signal( &prod_crc_cond );
            work_queue->get_from_queue( CRC_THREAD_INPUT );
        circ_buffer_block_prop_t* cur_prop = &( cur_buffer->prop );
        //        unsigned char *move_buf = cur_buffer->move_buf;

        //        stat_full.sample();
        //        stat_crc.sample();
        //        // Parse received broadcast transmission header and
        //        // check config file CRCs and data CRCs, check DCU size and
        //        number
        //        // Assign DCU status and cycle.
        //        unsigned int *header =
        //            (unsigned int *)(((char *)move_buf) -
        //            BROADCAST_HEADER_SIZE);
        //        int ndcu = ntohl(*header++);
        //        // printf("ndcu = %d\n", ndcu);
        //        if (ndcu > 0 && ndcu <= MAX_BROADCAST_DCU_NUM) {
        //            int data_offs = 0; // Offset to the current DCU data
        //            for (int j = 0; j < ndcu; j++) {
        //                unsigned int dcu_number;
        //                unsigned int dcu_size;   // Data size for this DCU
        //                unsigned int config_crc; // Configuration file CRC
        //                unsigned int dcu_crc;    // Data CRC
        //                unsigned int status; // DCU status word bits (0-ok,
        //                0xbad-out of
        //                // sync, 0x1000-trasm error
        //                // 0x2000 - configuration mismatch).
        //                unsigned int cycle;  // DCU cycle
        //                dcu_number = ntohl(*header++);
        //                dcu_size = ntohl(*header++);
        //                config_crc = ntohl(*header++);
        //                dcu_crc = ntohl(*header++);
        //                status = ntohl(*header++);
        //                cycle = ntohl(*header++);
        //                int ifo = 0;
        //                if (dcu_number > DCU_COUNT) {
        //                    ifo = 1;
        //                    dcu_number -= DCU_COUNT;
        //                }
        //                // printf("dcu=%d size=%d config_crc=0x%x crc=0x%x
        //                status=0x%x
        //                // cycle=%d\n",
        //                // dcu_number, dcu_size, config_crc, dcu_crc, status,
        //                cycle); if (daqd.dcuSize[ifo][dcu_number]) { // Don't
        //                do anything if
        //                    // this DCU is not
        //                    // configured
        //                    daqd.dcuStatus[ifo][dcu_number] = status;
        //                    daqd.dcuCycle[ifo][dcu_number] = cycle;
        //                    if (status ==
        //                        0) { // If the DCU status is OK from the
        //                        concentrator
        //                        // Check for local configuration and data
        //                        mismatch if (config_crc !=
        //                        daqd.dcuConfigCRC[ifo][dcu_number]) {
        //                            // Detected local configuration mismach
        //                            daqd.dcuStatus[ifo][dcu_number] |= 0x2000;
        //                        }
        //                        unsigned char *cp =
        //                            move_buf + data_offs; // Start of data
        //                        unsigned int bytes = dcu_size; // DCU data
        //                        size crc_obj.add(cp, bytes);
        ////                        unsigned int crc = 0;
        ////                        // Calculate DCU data CRC
        ////                        while (bytes--) {
        ////                            crc = (crc << 8) ^
        ////                                  crctab[((crc >> 24) ^ *(cp++)) &
        /// 0xFF]; /                        } /                        bytes =
        /// dcu_size; /                        while (bytes > 0) { / crc = (crc
        /// bytes) & 0xFF]; /                            bytes >>= 8; / } / crc
        /// = ~crc & 0xFFFFFFFF;
        //                        unsigned int crc = crc_obj.result();
        //                        if (crc != dcu_crc) {
        //                            // Detected data corruption !!!
        //                            daqd.dcuStatus[ifo][dcu_number] |= 0x1000;
        //                            DEBUG1(printf(
        //                                "ifo=%d dcu=%d calc_crc=0x%x
        //                                data_crc=0x%x\n", ifo, dcu_number,
        //                                crc, dcu_crc));
        //                        }
        //                        crc_obj.reset();
        //                    }
        //                }
        //                data_offs += dcu_size;
        //            }
        //        }
        //        stat_crc.tick();
        //
        //        // :TODO: make sure all DCUs configuration matches; restart
        //        when the
        //        // mismatch detected