Skip to content
Snippets Groups Projects
daqd.cc 77.3 KiB
Newer Older
/* daqd.cc - Main daqd source code file */

#include <config.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <signal.h>

#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/ioctl.h>
#include <sys/mman.h>

#include <string>
#include <iostream>
#include <fstream>
Jonathan Hanks's avatar
Jonathan Hanks committed

#ifndef DAQD_CPP11
#error DAQD_CPP11 must be defined
Jonathan Hanks's avatar
Jonathan Hanks committed
#include <unordered_map>

#include <memory>

#include "framecpp/Common/MD5SumFilter.hh"
#include "run_number_client.hh"
#include "daqmap.h"
extern "C" {
#include "crc.h"
#include "param.h"
}

using namespace std;

#include "circ.hh"
#include "y.tab.h"
#include "FlexLexer.h"
#include "channel.hh"
#include "daqc.h"
#include "daqd.hh"
#include "sing_list.hh"
#include "daqd_thread.hh"

#include <stdio.h>
#include <stdarg.h>
#include <time.h>
#include <string.h>

#include "epics_pvs.hh"

/// Helper function to deal with the archive channels.
daqd_c::configure_archive_channels( char* archive_name,
                                    char* file_name,
                                    char* f1 )
{
    archive_c* arc = 0;
    // Find archive
    for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
    {
        archive_c* a = (archive_c*)clink;
        if ( !strcmp( a->fsd.get_path( ), archive_name ) )
            arc = a;
    }
    if ( !arc )
        return DAQD_NOT_FOUND;

    // Read config file
    int res = arc->load_config( file_name );
    if ( res == 0 && f1 )
        res = arc->load_old_config( f1 );
    return res ? DAQD_NOT_FOUND : DAQD_OK;
/// Remove an archive from the list of known archives
/// :TODO: have to use locking here to avoid deleteing "live" archives
daqd_c::delete_archive( char* name )
{
    for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
    {
        archive_c* a = (archive_c*)clink;
        if ( !strcmp( a->fsd.get_path( ), name ) )
        {
            archive.remove( clink );
            return DAQD_OK;
        }
    }
    return DAQD_NOT_FOUND;
/// Create new archive if it's not created already
/// Set archive's prefix, suffix, number of Data dirs; scan archive file names
daqd_c::scan_archive( char* name, char* prefix, char* suffix, int ndirs )
{
    archive_c* arc = 0;
    for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
    {
        archive_c* a = (archive_c*)clink;
        if ( !strcmp( a->fsd.get_path( ), name ) )
            arc = a;
    }

    if ( !arc )
    {
        void* mptr = malloc( sizeof( archive_c ) );
        if ( !mptr )
            return DAQD_MALLOC;
        archive.insert( arc = new ( mptr ) archive_c( ) );
    }

    return arc->scan( name, prefix, suffix, ndirs );
/// Update file directory info for the archive
/// Called when new file is written into the archive by an external archive
/// writer
daqd_c::update_archive( char*         name,
                        unsigned long gps,
                        unsigned long dt,
                        unsigned int  dir_num )
{
    archive_c* arc = 0;
    for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
    {
        archive_c* a = (archive_c*)clink;
        if ( !strcmp( a->fsd.get_path( ), name ) )
            arc = a;
    }
    if ( !arc )
        return DAQD_NOT_FOUND;
    int res = arc->fsd.update_dir( gps, 0, dt, dir_num );
    if ( res )
        return DAQD_MALLOC;
    else
        return DAQD_OK;
extern void* interpreter_no_prompt( void* );
int          shutdown_server( );
/// Server shutdown flag
bool server_is_shutting_down = false;

daqd_c daqd; ///< root object
/// Is set to the program's executable name during run time
char* programname;
/// Mutual exclusion on Frames library calls
pthread_mutex_t framelib_lock;

#ifndef NDEBUG
/// Controls volume of the debugging messages that is printed out
/// Controls volume of log messages
//#include "../../src/drv/param.c"
struct cmp_struct
{
    bool
    operator( )( char* a, char* b )
    {
        return !strcmp( a, b );
    }
};
/// Sort on IFO number and DCU id
/// Do not change channel order within a DCU
int
chan_dcu_eq( const void* a, const void* b )
{
    unsigned int dcu1, dcu2;
    dcu1 = ( (channel_t*)a )->dcu_id + DCU_COUNT * ( (channel_t*)a )->ifoid;
    dcu2 = ( (channel_t*)b )->dcu_id + DCU_COUNT * ( (channel_t*)b )->ifoid;
    if ( dcu1 == dcu2 )
        return ( (channel_t*)a )->seq_num - ( (channel_t*)b )->seq_num;
    else
        return dcu1 - dcu2;
/// DCU id of the current configuration file (ini file)
/// Broadcast channel configuration callback function.
int
bcstConfigCallback( char* name, struct CHAN_PARAM* parm, void* user )
{
    printf( "Broadcast channel %s configured\n", name );
    daqd.broadcast_set.insert( name );
    return 1;
/// Remove spaces in place
void
RemoveSpaces( char* source )
    char* i = source;
    char* j = source;
    while ( *j != 0 )
    {
        *i = *j++;
        if ( *i != ' ' )
            i++;
    }
    *i = 0;
/// Configure data channel info from config files
daqd_c::configure_channels_files( )
    // See if we have configured broadcast channel file
    // where the set of channels to broadcast to the DMT is specified
    //
    if ( broadcast_config.compare( "" ) )
    {
        unsigned long crc = 0;
        if ( 0 ==
             parseConfigFile( (char*)broadcast_config.c_str( ),
                              &crc,
                              bcstConfigCallback,
                              0,
                              0,
                              0 ) )
        {
            printf( "Failed to parse broadcast config file %s\n",
                    broadcast_config.c_str( ) );
            return 1;
        }
    }
    // error message buffer
    char errmsgbuf[ 80 ];

    // File names are specified in `master_config' file
    FILE* mcf = NULL;
    mcf = fopen( master_config.c_str( ), "r" );
    if ( mcf == NULL )
    {
        strerror_r( errno, errmsgbuf, sizeof( errmsgbuf ) );
        system_log( 1,
                    "failed to open `%s' for reading: %s",
                    master_config.c_str( ),
                    errmsgbuf );
        return 1;
    }

    num_channels = 0;
    num_active_channels = 0;
    num_science_channels = 0;
    memset( channels, 0, sizeof( channels[ 0 ] ) * daqd_c::max_channels );

    for ( ;; )
    {
        unsigned long crc = 0;
        int   chanConfigCallback( char*, struct CHAN_PARAM*, void* user );
        int   testpoint = 0;
        char  buf[ 1024 ];
        char* c = fgets( buf, 1024, mcf );
        if ( feof( mcf ) )
            break;
        if ( *buf == '#' )
            continue;
        RemoveSpaces( buf );
        if ( strlen( buf ) > 0 )
        {
            if ( buf[ strlen( buf ) - 1 ] == '\n' )
                buf[ strlen( buf ) - 1 ] = 0;
        }
        if ( strlen( buf ) == 0 )
            continue;

        if ( strlen( buf ) > 4 )
        {
            testpoint = !strcmp( buf + strlen( buf ) - 4, ".par" );
        }

        ini_file_dcu_id = 0;
        if ( 0 ==
             parseConfigFile( buf, &crc, chanConfigCallback, testpoint, 0, 0 ) )
        {
            printf( "Failed to parse config file %s\n", buf );
            return 1;
        }
        // DEBUG(1, cerr << "Channel config: dcu " <<
        // daqd.channels[daqd.num_channels - 1].dcu_id << " crc=0x" << hex <<
        // crc
        // << dec << endl); printf("%s has dcuid=%d\n", buf, ini_file_dcu_id);
        if ( daqd.num_channels )
        {
            daqd.dcuConfigCRC[ daqd.channels[ daqd.num_channels - 1 ].ifoid ]
                             [ daqd.channels[ daqd.num_channels - 1 ].dcu_id ] =
                crc;
        }

        if ( ini_file_dcu_id > 0 && ini_file_dcu_id < DCU_COUNT )
        {
            // only set DCU name if this is an INI file (*.ini)
            if ( !strcmp( buf + strlen( buf ) - 4, ".ini" ) )
            {
                char* slp = strrchr( buf, '/' );
                if ( slp )
                {
                    slp += 3;
                    buf[ strlen( buf ) - 4 ] = 0;
                    sprintf( daqd.dcuName[ ini_file_dcu_id ], "%.31s", slp );
                    sprintf(
                        daqd.fullDcuName[ ini_file_dcu_id ], "%.31s", slp - 2 );

                    extern char epicsDcuName[ DCU_COUNT ][ 40 ];
                    sprintf(
                        epicsDcuName[ ini_file_dcu_id ], "%.39s", slp - 2 );

                    buf[ strlen( buf ) - 4 ] = '.';
                }
            }
        }
    }
    fclose( mcf );

    // See if we have duplicate names
    {

        std::unordered_map< char*, int > m;

        for ( int i = 0; i < daqd.num_channels; i++ )
        {
            if ( m[ daqd.channels[ i ].name ] )
            {
                system_log( 1,
                            "Fatal error: channel `%s' is duplicated %d",
                            daqd.channels[ i ].name,
                            m[ daqd.channels[ i ].name ] );
                return 1;
            }
            m[ daqd.channels[ i ].name ] = i;
        }
    }

    /* Sort channels on the IFO ID and then on DCU ID */
    qsort( daqd.channels,
           daqd.num_channels,
           sizeof( daqd.channels[ 0 ] ),
           chan_dcu_eq );

    /* Update sequence number */
    for ( int i = 0; i < daqd.num_channels; i++ )
        daqd.channels[ i ].seq_num = i;

    /* Epics display */
    {
        int chan_count_total = daqd.num_channels -
            daqd.num_gds_channel_aliases - daqd.num_epics_channels;
        PV::set_pv( PV::PV_TOTAL_CHANS, chan_count_total );

        int chan_count_science = daqd.num_science_channels -
            daqd.num_gds_channel_aliases - daqd.num_epics_channels;
        PV::set_pv( PV::PV_SCIENCE_TOTAL_CHANS, chan_count_science );
    }
    system_log( 1, "finished configuring data channels" );
    return 0;
}
/// Channel configuration callback function.
chanConfigCallback( char* channel_name, struct CHAN_PARAM* params, void* user )
    if ( daqd.num_channels >= daqd_c::max_channels )
    {
        system_log(
            1, "Too many channels. Hard limit is %d", daqd_c::max_channels );
        return 0;
    }
    channel_t* ccd = &daqd.channels[ daqd.num_channels++ ];
    ccd->seq_num = daqd.num_channels - 1;
    ccd->id = 0;
    if ( params->dcuid >= DCU_COUNT || params->dcuid < 0 )
    {
        system_log(
            1, "channel `%s' has bad DCU id %d", ccd->name, params->dcuid );
        return 0;
    }
    ccd->dcu_id = params->dcuid;
    ini_file_dcu_id = params->dcuid;

    if ( params->ifoid == 0 || params->ifoid == 1 )
        ccd->ifoid = 0; // The 4K Ifo
    else
        ccd->ifoid = 1; // The 2K Ifo

    // We use rm id now to set the system
    ccd->tp_node = params->rmid;
    // printf("channel %s has node id %d\n", channel_name, ccd -> tp_node);

    strncpy( ccd->name, channel_name, channel_t::channel_name_max_len - 1 );
    ccd->name[ channel_t::channel_name_max_len - 1 ] = 0;
    ccd->chNum = params->chnnum;
    ccd->bps = daqd_c::data_type_size( params->datatype );
    ccd->data_type = (daq_data_t)params->datatype;
    ccd->sample_rate = params->datarate;

    //  ccd -> rm_offset = bsw(dinfo -> dataOffset) + sizeof (int);
    //  ccd -> rm_block_size = bsw(mmap -> dataBlockSize);

    // Activate channels for saving into full frames
    // Do not save 1 Hz slow channels
    ccd->active = 0;
    if ( ccd->sample_rate > 1 )
    {
        ccd->active = params->acquire;
    }
    if ( ccd->active )
    {
        daqd.num_active_channels++;
        if ( ccd->active & 2 )
        {
            daqd.num_science_channels++;
        }
    }

    // 1Hz channels will be acquired at 16Hz
    if ( ccd->sample_rate == 1 )
    {
        ccd->sample_rate = 16;
    }

    ccd->group_num = 0;

    //  ccd -> rm_dinfo = (dataInfoStr *) dinfo;
    // GDS_CHANNEL --> 1
    // GDS_ALIAS   --> 2
    // If channel (test point) number is specified, then this is an alias
    // channel
    if ( IS_TP_DCU( ccd->dcu_id ) )
        ccd->gds = params->testpoint ? 2 : 1;
    // printf("channel %s has gds=%d\n", ccd -> name, ccd -> gds);

    // GDS channels not trended
    if ( IS_GDS_ALIAS( *ccd ) || IS_GDS_SIGNAL( *ccd ) )
        ccd->trend = 0;
    else
        ccd->trend = ccd->active; // Trend all active channels;

    if ( IS_GDS_ALIAS( *ccd ) )
    {
        ccd->active = 0;
        daqd.num_gds_channel_aliases++;
    }
    else
    {
        if ( IS_GDS_SIGNAL( *ccd ) )
            daqd.num_gds_channels++;
    }

    // assign conversion data
    ccd->signal_gain = params->gain;
    ccd->signal_slope = params->slope;
    ccd->signal_offset = params->offset;
    strncpy(
        ccd->signal_units, params->units, channel_t::engr_unit_max_len - 1 );
    ccd->signal_units[ channel_t::engr_unit_max_len - 1 ] = 0;

    // set DCU rate
    extern int default_dcu_rate;
    daqd.dcuRate[ ccd->ifoid ][ ccd->dcu_id ] = default_dcu_rate;
    // printf("dcu %d rate %d\n", ccd -> dcu_id, default_dcu_rate);
    return 1;
void
daqd_c::update_configuration_number( const char* source_address )
    if ( _configuration_number != 0 || !source_address )
        return;
    if ( num_channels == 0 )
        return;

    FrameCPP::Common::MD5Sum check_sum;

    channel_t* cur = channels;
    channel_t* end = channels + num_channels;
    for ( ; cur < end; ++cur )
    {

        check_sum.Update( &( cur->chNum ), sizeof( cur->chNum ) );
        check_sum.Update( &( cur->seq_num ), sizeof( cur->seq_num ) );
        size_t name_len = strnlen( cur->name, channel_t::channel_name_max_len );
        check_sum.Update( cur->name, sizeof( name_len ) );
        check_sum.Update( &( cur->sample_rate ), sizeof( cur->sample_rate ) );
        check_sum.Update( &( cur->active ), sizeof( cur->active ) );
        check_sum.Update( &( cur->trend ), sizeof( cur->trend ) );
        check_sum.Update( &( cur->group_num ), sizeof( cur->group_num ) );
        check_sum.Update( &( cur->bps ), sizeof( cur->bps ) );
        check_sum.Update( &( cur->dcu_id ), sizeof( cur->dcu_id ) );
        check_sum.Update( &( cur->data_type ), sizeof( cur->data_type ) );
        check_sum.Update( &( cur->signal_gain ), sizeof( cur->signal_gain ) );
        check_sum.Update( &( cur->signal_slope ), sizeof( cur->signal_slope ) );
        check_sum.Update( &( cur->signal_offset ),
                          sizeof( cur->signal_offset ) );
        size_t unit_len =
            strnlen( cur->signal_units, channel_t::engr_unit_max_len );
        check_sum.Update( cur->signal_units, sizeof( unit_len ) );
    }
    check_sum.Finalize( );

    std::ostringstream ss;
    ss << check_sum;
    std::string hash = ss.str( );
    _configuration_number =
        daqd_run_number::get_run_number( source_address, hash );
    system_log(
        0, "configuration/run number = %d", (int)_configuration_number );
    PV::set_pv( PV::PV_CONFIGURATION_NUMBER, _configuration_number );
/// Linear search for a channel group name in channel_groups array.
int
daqd_c::find_channel_group( const char* channel_name )
    for ( int i = 0; i < num_channel_groups; i++ )
    {
        if ( !strncasecmp( channel_groups[ i ].name,
                           channel_name,
                           strlen( channel_groups[ i ].name ) ) )
            return channel_groups[ i ].num;
    }
    return 0;
/// Create full resolution frame object.
daqd_c::full_frame( int                frame_length_seconds,
                    int                science,
                    adc_data_ptr_type& dptr )
    unsigned long nchans = 0;

    if ( science )
    {
        nchans = num_science_channels;
    }
    else
    {
        nchans = num_active_channels;
    }
    FrameCPP::Version::FrAdcData* adc =
        new FrameCPP::Version::FrAdcData[ nchans ];
    FrameCPP::Version::FrameH::rawData_type rawData =
        FrameCPP::Version::FrameH::rawData_type(
            new FrameCPP::Version::FrRawData );
    ldas_frame_h_type frame;
    FrameCPP::Version::FrHistory history(
        "", 0, "framebuilder, framecpp-" + string( LDAS_VERSION ) );
    FrameCPP::Version::FrHistory history(
        "", 0, "framebuilder, framecpp-" + string( FRAMECPP_VERSION ) );
    FrameCPP::Version::FrHistory history(
        "", 0, "framebuilder, framecpp-unknown" );
    FrameCPP::Version::FrDetector detector = daqd.getDetector1( );

    // Create frame
    //
    try
    {
        frame = ldas_frame_h_type( new FrameCPP::Version::FrameH(
            "LIGO",
            configuration_number( ), // run number ??? buffpt -r> block_prop
                                     // (nb) -> prop.run;
            1, // frame number
            FrameCPP::Version::GPSTime( 0, 0 ),
            0, // leap seconds
            frame_length_seconds // dt
            ) );
        frame->RefDetectProc( ).append( detector );

        // Append second detector if it is defined
        if ( daqd.detector_name1.length( ) > 0 )
        {
            FrameCPP::Version::FrDetector detector1 = daqd.getDetector2( );
            frame->RefDetectProc( ).append( detector1 );
        }

        frame->SetRawData( rawData );
        frame->RefHistory( ).append( history );
    }
    catch ( bad_alloc )
    {
        system_log( 1, "Couldn't create full frame" );
        // shutdown_server ();
        // return NULL;
        abort( );
    }

    // Create ADCs
    try
    {
        // Fast channels
        unsigned int cur_chn = 0;

        for ( int i = 0; i < num_channels; i++ )
        {
            // Skip chanels we don't want to save
            if ( science ? 0 == ( channels[ i ].active & 2 )
                         : !channels[ i ].active )
                continue;

            FrameCPP::Version::FrAdcData adc = FrameCPP::Version::FrAdcData(
                std::string( channels[ i ].name ),
                channels[ i ].group_num,
                i, // channel ???
                CHAR_BIT * channels[ i ].bps,
                channels[ i ].sample_rate,
                channels[ i ].signal_offset,
                channels[ i ].signal_slope,
                std::string( channels[ i ].signal_units ),
                channels[ i ].data_type == _32bit_complex
                    ? channels[ i ].signal_gain
                    : .0, /* Freq shift */
                0,
                0,
                .0 ); /* heterodyning phase in radians */

            if ( channels[ i ].sample_rate > 16 )
            {
                /* Append ADC AUX vector to store 16 status words per second */
                FrameCPP::Version::Dimension aux_dims[ 1 ] = {
                    FrameCPP::Version::Dimension(
                        16 * frame_length_seconds, 1. / 16, "" )
                };
                FrameCPP::Version::FrVect* aux_vect =
                    new FrameCPP::Version::FrVect(
                        "dataValid",
                        1,
                        aux_dims,
                        new INT_2S[ 16 * frame_length_seconds ],
                        "" );
                adc.RefAux( ).append( *aux_vect );
            }

            /* Append ADC data vector */
            INT_4U nx = channels[ i ].sample_rate * frame_length_seconds;
            FrameCPP::Version::Dimension dims[ 1 ] = {
                FrameCPP::Version::Dimension(
                    nx, 1. / channels[ i ].sample_rate, "time" )
            };
            FrameCPP::Version::FrVect* vect;
            switch ( channels[ i ].data_type )
            {
            case _32bit_complex:
            {
                vect = new FrameCPP::Version::FrVect(
                    std::string( channels[ i ].name ),
                    1,
                    dims,
                    new COMPLEX_8[ nx ],
                    std::string( channels[ i ].signal_units ) );
                break;
            }
            case _64bit_double:
            {
                vect = new FrameCPP::Version::FrVect(
                    std::string( channels[ i ].name ),
                    1,
                    dims,
                    new REAL_8[ nx ],
                    std::string( channels[ i ].signal_units ) );
                break;
            }
            case _32bit_float:
            {
                vect = new FrameCPP::Version::FrVect(
                    std::string( channels[ i ].name ),
                    1,
                    dims,
                    new REAL_4[ nx ],
                    std::string( channels[ i ].signal_units ) );
                break;
            }
            case _32bit_integer:
            {
                vect = new FrameCPP::Version::FrVect(
                    std::string( channels[ i ].name ),
                    1,
                    dims,
                    new INT_4S[ nx ],
                    std::string( channels[ i ].signal_units ) );
                break;
            }
            case _32bit_uint:
            {
                vect = new FrameCPP::Version::FrVect(
                    std::string( channels[ i ].name ),
                    1,
                    dims,
                    new INT_4U[ nx ],
                    std::string( channels[ i ].signal_units ) );
                break;
            }
            case _64bit_integer:
            {
                abort( );
            }
            default:
            {
                vect = new FrameCPP::Version::FrVect(
                    std::string( channels[ i ].name ),
                    1,
                    dims,
                    new INT_2S[ nx ],
                    channels[ i ].signal_units );
                break;
            }
            }
            adc.RefData( ).append( *vect );
            frame->GetRawData( )->RefFirstAdc( ).append( adc );
            unsigned char* dptr_fast_data = frame->GetRawData( )
                                                ->RefFirstAdc( )[ cur_chn ]
                                                ->RefData( )[ 0 ]
                                                ->GetData( )
                                                .get( );
            INT_2U* dptr_aux_data = 0;
            if ( channels[ i ].sample_rate > 16 )
            {
                dptr_aux_data = (INT_2U*)frame->GetRawData( )
                                    ->RefFirstAdc( )[ cur_chn ]
                                    ->RefAux( )[ 0 ]
                                    ->GetData( )
                                    .get( );
            }
            dptr.push_back( pair< unsigned char*, INT_2U* >( dptr_fast_data,
                                                             dptr_aux_data ) );
            cur_chn++;
        }
    }
    catch ( bad_alloc )
    {
        system_log( 1, "Couldn't create ADC channel data" );
        // delete frame;
        //    shutdown_server ();
        // return NULL;
        abort( );
    }

    return frame;
/// IO Thread for the full resolution frame saver
/// This is the thread that does the actual writing
daqd_c::framer_io( shared_frame_work_queue_ptr _work_queue, int science )
    const int   STATE_NORMAL = 0;
    const int   STATE_WRITING = 1;
    const int   STATE_BROADCAST = 2;
    bool        shmem_bcast_frame = true;
    bool        dump_bcast_frame = false;
    std::string long_lived_debug_frame = "";

    shmem_bcast_frame = parameters( ).get< int >( "GDS_BROADCAST", 0 ) == 1;
        parameters( ).get< int >( "DUMP_PERIODIC_BROADCAST_FRAME", 0 ) == 1;

    if ( science )
    {
        daqd_c::set_thread_priority( "Science frame saver IO",
                                     "dqscifrio",
                                     SAVER_THREAD_PRIORITY,
                                     SCIENCE_SAVER_IO_CPUAFFINITY );
    }
    else
    {
        daqd_c::set_thread_priority( "Frame saver IO",
                                     "dqfulfrio",
                                     SAVER_THREAD_PRIORITY,
                                     FULL_SAVER_IO_CPUAFFINITY );
    }
    enum PV::PV_NAME epics_state_var =
        ( science ? PV::PV_SCIENCE_FW_STATE : PV::PV_RAW_FW_STATE );

    PV::set_pv( epics_state_var, STATE_NORMAL );
    for ( long frame_cntr = 0;; frame_cntr++ )
    {
        framer_buf* cur_buf = _work_queue->get_from_queue( 1 );

        DEBUG( 1,
               cerr << "About to write " << ( science ? "science" : "full" )
                    << " frame @" << cur_buf->gps << endl );
        if ( science )
        {
            cur_buf->dir_num = science_fsd.getDirFileNames( cur_buf->gps,
                                                            cur_buf->_tmpf,
                                                            cur_buf->tmpf,
                                                            frames_per_file,
                                                            blocks_per_frame );
        }
        else
        {
            cur_buf->dir_num = fsd.getDirFileNames( cur_buf->gps,
                                                    cur_buf->_tmpf,
                                                    cur_buf->tmpf,
                                                    frames_per_file,
                                                    blocks_per_frame );
        }

        int fd = creat( cur_buf->_tmpf, 0644 );
        if ( fd < 0 )
        {
            system_log(
                1,
                "Couldn't open full frame file `%s' for writing; errno %d",
                cur_buf->_tmpf,
                errno );
            if ( science )
            {
                science_fsd.report_lost_frame( );
            }
            else
            {
                fsd.report_lost_frame( );
            }
            set_fault( );
        }
        else
        {
            close( fd );
            /*try*/
            {

                PV::set_pv( epics_state_var, STATE_WRITING );
                time_t t = 0;
                {
                    FrameCPP::Common::MD5SumFilter            md5filter;
                    FrameCPP::Common::FrameBuffer< filebuf >* obuf =
                        new FrameCPP::Common::FrameBuffer< std::filebuf >(
                            std::ios::out );
                    obuf->open( cur_buf->_tmpf,
                                std::ios::out | std::ios::binary );
                    obuf->FilterAdd( &md5filter );
                    FrameCPP::Common::OFrameStream ofs( obuf );
                    ofs.SetCheckSumFile( FrameCPP::Common::CheckSum::CRC );
                    DEBUG( 1, cerr << "Begin WriteFrame()" << endl );
                    t = time( 0 );
                    ofs.WriteFrame(
                        cur_buf->frame,
                        // FrameCPP::Version::FrVect::GZIP, 1,
                        daqd.no_compression
                            ? FrameCPP::FrVect::RAW
                            : FrameCPP::FrVect::ZERO_SUPPRESS_OTHERWISE_GZIP,
                        1,
                        // FrameCPP::Compression::MODE_ZERO_SUPPRESS_SHORT,
                        // FrameCPP::Version::FrVect::DEFAULT_GZIP_LEVEL, /* 6
                        // */
                        FrameCPP::Common::CheckSum::CRC );

                    ofs.Close( );
                    obuf->close( );
                    md5filter.Finalize( );

                    queue_frame_checksum(
                        cur_buf->tmpf,
                        ( science ? daqd_c::science_frame : daqd_c::raw_frame ),
                        md5filter );

                    PV::set_pv( ( science ? PV::PV_SCIENCE_FRAME_CHECK_SUM_TRUNC
                                          : PV::PV_FRAME_CHECK_SUM_TRUNC ),
                                *reinterpret_cast< const unsigned int* >(
                                    md5filter.Value( ) ) );
                }
                t = time( 0 ) - t;
                PV::set_pv( epics_state_var, STATE_NORMAL );
                DEBUG( 1,
                       cerr << ( science ? "Science" : "Full" )
                            << " frame done in " << t << " seconds" << endl );
                /* Record frame write time */
                if ( science )
                {
                    PV::set_pv( PV::PV_SCIENCE_FRAME_WRITE_SEC, t );
                }
                else
                {
                    PV::set_pv( PV::PV_FRAME_WRITE_SEC, t );
                }

                if ( rename( cur_buf->_tmpf, cur_buf->tmpf ) )
                {
                    system_log( 1, "failed to rename file; errno %d", errno );
                    if ( science )
                    {
                        science_fsd.report_lost_frame( );
                    }
                    else
                    {
                        fsd.report_lost_frame( );
                    }
                    set_fault( );
                }
                else
                {

                    DEBUG( 3,
                           cerr << "frame " << frame_cntr << "("
                                << cur_buf->frame_number << ") is written out"
                                << endl );
                    // Successful frame write
                    if ( science )
                    {
                        science_fsd.update_dir(
                            cur_buf->gps,
                            cur_buf->gps_n,
                            cur_buf->frame_file_length_seconds,
                            cur_buf->dir_num );
                    }
                    else
                    {
                        fsd.update_dir( cur_buf->gps,
                                        cur_buf->gps_n,
                                        cur_buf->frame_file_length_seconds,
                                        cur_buf->dir_num );
                    }

                    // Report frame size to the Epics world
                    fd = open( cur_buf->tmpf, O_RDONLY );
                    if ( fd == -1 )
                    {
                        system_log( 1, "failed to open file; errno %d", errno );
                        exit( 1 );
                    }
                    struct stat sb;
                    if ( fstat( fd, &sb ) == -1 )
                    {
                        system_log(
                            1, "failed to fstat file; errno %d", errno );
                        exit( 1 );
                    }
                    if ( science )
                        PV::set_pv( PV::PV_SCIENCE_FRAME_SIZE, sb.st_size );
                    else
                        PV::set_pv( PV::PV_FRAME_SIZE, sb.st_size );
                    close( fd );
                }

                // Update the EPICS_SAVED value
                if ( !science )
                {
                    PV::set_pv( PV::PV_CHANS_SAVED, cur_buf->nac );
                }
                else
                {
                    PV::set_pv( PV::PV_SCIENCE_CHANS_SAVED, cur_buf->nac );
                }

                if ( shmem_bcast_frame )
                {
                    // We are compiled to be a DMT broadcaster
                    //
                    fd = open( cur_buf->tmpf, O_RDONLY );
                    if ( fd == -1 )
                    {
                        system_log( 1, "failed to open file; errno %d", errno );
                        exit( 1 );
                    }
                    struct stat sb;
                    if ( fstat( fd, &sb ) == -1 )
                    {
                        system_log(
                            1, "failed to fstat file; errno %d", errno );
                        exit( 1 );
                    }
                    void* addr =