Newer
Older
/* daqd.cc - Main daqd source code file */
#include <config.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <string>
#include <iostream>
#ifndef DAQD_CPP11
#error DAQD_CPP11 must be defined
#include <fstream>
#include <memory>
#include "framecpp/Common/MD5SumFilter.hh"
#include "run_number_client.hh"
#include "daqmap.h"
extern "C" {
#include "crc.h"
#include "param.h"
}
using namespace std;
#include "circ.hh"
#include "y.tab.h"
#include "FlexLexer.h"
#include "channel.hh"
#include "daqc.h"
#include "daqd.hh"
#include "sing_list.hh"
#include "net_writer.hh"
#include <stdio.h>
#include <stdarg.h>
#include <time.h>
#include <string.h>
/// Helper function to deal with the archive channels.
daqd_c::configure_archive_channels( char* archive_name,
char* file_name,
char* f1 )
{
archive_c* arc = 0;
// Find archive
for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
{
archive_c* a = (archive_c*)clink;
if ( !strcmp( a->fsd.get_path( ), archive_name ) )
arc = a;
}
if ( !arc )
return DAQD_NOT_FOUND;
// Read config file
int res = arc->load_config( file_name );
if ( res == 0 && f1 )
res = arc->load_old_config( f1 );
return res ? DAQD_NOT_FOUND : DAQD_OK;
/// Remove an archive from the list of known archives
/// :TODO: have to use locking here to avoid deleteing "live" archives
daqd_c::delete_archive( char* name )
{
for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
{
archive_c* a = (archive_c*)clink;
if ( !strcmp( a->fsd.get_path( ), name ) )
{
archive.remove( clink );
return DAQD_OK;
}
}
return DAQD_NOT_FOUND;
/// Create new archive if it's not created already
/// Set archive's prefix, suffix, number of Data dirs; scan archive file names
daqd_c::scan_archive( char* name, char* prefix, char* suffix, int ndirs )
{
archive_c* arc = 0;
for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
{
archive_c* a = (archive_c*)clink;
if ( !strcmp( a->fsd.get_path( ), name ) )
arc = a;
}
if ( !arc )
{
void* mptr = malloc( sizeof( archive_c ) );
if ( !mptr )
return DAQD_MALLOC;
archive.insert( arc = new ( mptr ) archive_c( ) );
}
return arc->scan( name, prefix, suffix, ndirs );
/// Update file directory info for the archive
/// Called when new file is written into the archive by an external archive
/// writer
daqd_c::update_archive( char* name,
unsigned long gps,
unsigned long dt,
unsigned int dir_num )
{
archive_c* arc = 0;
for ( s_link* clink = archive.first( ); clink; clink = clink->next( ) )
{
archive_c* a = (archive_c*)clink;
if ( !strcmp( a->fsd.get_path( ), name ) )
arc = a;
}
if ( !arc )
return DAQD_NOT_FOUND;
int res = arc->fsd.update_dir( gps, 0, dt, dir_num );
if ( res )
return DAQD_MALLOC;
else
return DAQD_OK;
extern void* interpreter_no_prompt( void* );
int shutdown_server( );
/// Server shutdown flag
bool server_is_shutting_down = false;
daqd_c daqd; ///< root object
/// Is set to the program's executable name during run time
char* programname;
/// Mutual exclusion on Frames library calls
pthread_mutex_t framelib_lock;
#ifndef NDEBUG
/// Controls volume of the debugging messages that is printed out
int _debug = 10;
#endif
/// Controls volume of log messages
int _log_level;
//#include "../../src/drv/param.c"
struct cmp_struct
{
bool
operator( )( char* a, char* b )
{
return !strcmp( a, b );
}
};
/// Sort on IFO number and DCU id
/// Do not change channel order within a DCU
int
chan_dcu_eq( const void* a, const void* b )
{
unsigned int dcu1, dcu2;
dcu1 = ( (channel_t*)a )->dcu_id + DCU_COUNT * ( (channel_t*)a )->ifoid;
dcu2 = ( (channel_t*)b )->dcu_id + DCU_COUNT * ( (channel_t*)b )->ifoid;
if ( dcu1 == dcu2 )
return ( (channel_t*)a )->seq_num - ( (channel_t*)b )->seq_num;
else
return dcu1 - dcu2;
/// DCU id of the current configuration file (ini file)
int ini_file_dcu_id = 0;
/// Broadcast channel configuration callback function.
int
bcstConfigCallback( char* name, struct CHAN_PARAM* parm, void* user )
{
printf( "Broadcast channel %s configured\n", name );
daqd.broadcast_set.insert( name );
return 1;
/// Remove spaces in place
void
RemoveSpaces( char* source )
char* i = source;
char* j = source;
while ( *j != 0 )
{
*i = *j++;
if ( *i != ' ' )
i++;
}
*i = 0;
/// Configure data channel info from config files
daqd_c::configure_channels_files( )
// See if we have configured broadcast channel file
// where the set of channels to broadcast to the DMT is specified
//
if ( broadcast_config.compare( "" ) )
{
unsigned long crc = 0;
if ( 0 ==
parseConfigFile( (char*)broadcast_config.c_str( ),
&crc,
bcstConfigCallback,
0,
0,
0 ) )
{
printf( "Failed to parse broadcast config file %s\n",
broadcast_config.c_str( ) );
return 1;
}
}
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// error message buffer
char errmsgbuf[ 80 ];
// File names are specified in `master_config' file
FILE* mcf = NULL;
mcf = fopen( master_config.c_str( ), "r" );
if ( mcf == NULL )
{
strerror_r( errno, errmsgbuf, sizeof( errmsgbuf ) );
system_log( 1,
"failed to open `%s' for reading: %s",
master_config.c_str( ),
errmsgbuf );
return 1;
}
num_channels = 0;
num_active_channels = 0;
num_science_channels = 0;
memset( channels, 0, sizeof( channels[ 0 ] ) * daqd_c::max_channels );
for ( ;; )
{
unsigned long crc = 0;
int chanConfigCallback( char*, struct CHAN_PARAM*, void* user );
int testpoint = 0;
char buf[ 1024 ];
char* c = fgets( buf, 1024, mcf );
if ( feof( mcf ) )
break;
if ( *buf == '#' )
continue;
RemoveSpaces( buf );
if ( strlen( buf ) > 0 )
{
if ( buf[ strlen( buf ) - 1 ] == '\n' )
buf[ strlen( buf ) - 1 ] = 0;
}
if ( strlen( buf ) == 0 )
continue;
if ( strlen( buf ) > 4 )
{
testpoint = !strcmp( buf + strlen( buf ) - 4, ".par" );
}
ini_file_dcu_id = 0;
if ( 0 ==
parseConfigFile( buf, &crc, chanConfigCallback, testpoint, 0, 0 ) )
{
printf( "Failed to parse config file %s\n", buf );
return 1;
}
// DEBUG(1, cerr << "Channel config: dcu " <<
// daqd.channels[daqd.num_channels - 1].dcu_id << " crc=0x" << hex <<
// crc
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
// << dec << endl); printf("%s has dcuid=%d\n", buf, ini_file_dcu_id);
if ( daqd.num_channels )
{
daqd.dcuConfigCRC[ daqd.channels[ daqd.num_channels - 1 ].ifoid ]
[ daqd.channels[ daqd.num_channels - 1 ].dcu_id ] =
crc;
}
if ( ini_file_dcu_id > 0 && ini_file_dcu_id < DCU_COUNT )
{
// only set DCU name if this is an INI file (*.ini)
if ( !strcmp( buf + strlen( buf ) - 4, ".ini" ) )
{
char* slp = strrchr( buf, '/' );
if ( slp )
{
slp += 3;
buf[ strlen( buf ) - 4 ] = 0;
sprintf( daqd.dcuName[ ini_file_dcu_id ], "%.31s", slp );
sprintf(
daqd.fullDcuName[ ini_file_dcu_id ], "%.31s", slp - 2 );
extern char epicsDcuName[ DCU_COUNT ][ 40 ];
sprintf(
epicsDcuName[ ini_file_dcu_id ], "%.39s", slp - 2 );
buf[ strlen( buf ) - 4 ] = '.';
}
}
}
}
fclose( mcf );
// See if we have duplicate names
{
std::unordered_map< char*, int > m;
for ( int i = 0; i < daqd.num_channels; i++ )
{
if ( m[ daqd.channels[ i ].name ] )
{
system_log( 1,
"Fatal error: channel `%s' is duplicated %d",
daqd.channels[ i ].name,
m[ daqd.channels[ i ].name ] );
return 1;
}
m[ daqd.channels[ i ].name ] = i;
}
}
/* Sort channels on the IFO ID and then on DCU ID */
qsort( daqd.channels,
daqd.num_channels,
sizeof( daqd.channels[ 0 ] ),
chan_dcu_eq );
/* Update sequence number */
for ( int i = 0; i < daqd.num_channels; i++ )
daqd.channels[ i ].seq_num = i;
/* Epics display */
{
int chan_count_total = daqd.num_channels -
daqd.num_gds_channel_aliases - daqd.num_epics_channels;
PV::set_pv( PV::PV_TOTAL_CHANS, chan_count_total );
int chan_count_science = daqd.num_science_channels -
daqd.num_gds_channel_aliases - daqd.num_epics_channels;
PV::set_pv( PV::PV_SCIENCE_TOTAL_CHANS, chan_count_science );
}
system_log( 1, "finished configuring data channels" );
return 0;
}
/// Channel configuration callback function.
chanConfigCallback( char* channel_name, struct CHAN_PARAM* params, void* user )
if ( daqd.num_channels >= daqd_c::max_channels )
{
system_log(
1, "Too many channels. Hard limit is %d", daqd_c::max_channels );
return 0;
}
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
channel_t* ccd = &daqd.channels[ daqd.num_channels++ ];
ccd->seq_num = daqd.num_channels - 1;
ccd->id = 0;
if ( params->dcuid >= DCU_COUNT || params->dcuid < 0 )
{
system_log(
1, "channel `%s' has bad DCU id %d", ccd->name, params->dcuid );
return 0;
}
ccd->dcu_id = params->dcuid;
ini_file_dcu_id = params->dcuid;
if ( params->ifoid == 0 || params->ifoid == 1 )
ccd->ifoid = 0; // The 4K Ifo
else
ccd->ifoid = 1; // The 2K Ifo
// We use rm id now to set the system
ccd->tp_node = params->rmid;
// printf("channel %s has node id %d\n", channel_name, ccd -> tp_node);
strncpy( ccd->name, channel_name, channel_t::channel_name_max_len - 1 );
ccd->name[ channel_t::channel_name_max_len - 1 ] = 0;
ccd->chNum = params->chnnum;
ccd->bps = daqd_c::data_type_size( params->datatype );
ccd->data_type = (daq_data_t)params->datatype;
ccd->sample_rate = params->datarate;
// ccd -> rm_offset = bsw(dinfo -> dataOffset) + sizeof (int);
// ccd -> rm_block_size = bsw(mmap -> dataBlockSize);
// Activate channels for saving into full frames
// Do not save 1 Hz slow channels
ccd->active = 0;
if ( ccd->sample_rate > 1 )
{
ccd->active = params->acquire;
}
if ( ccd->active )
{
daqd.num_active_channels++;
if ( ccd->active & 2 )
{
daqd.num_science_channels++;
}
}
// 1Hz channels will be acquired at 16Hz
if ( ccd->sample_rate == 1 )
{
ccd->sample_rate = 16;
}
ccd->group_num = 0;
// ccd -> rm_dinfo = (dataInfoStr *) dinfo;
// GDS_CHANNEL --> 1
// GDS_ALIAS --> 2
// If channel (test point) number is specified, then this is an alias
// channel
if ( IS_TP_DCU( ccd->dcu_id ) )
ccd->gds = params->testpoint ? 2 : 1;
// printf("channel %s has gds=%d\n", ccd -> name, ccd -> gds);
// GDS channels not trended
if ( IS_GDS_ALIAS( *ccd ) || IS_GDS_SIGNAL( *ccd ) )
ccd->trend = 0;
else
ccd->trend = ccd->active; // Trend all active channels;
if ( IS_GDS_ALIAS( *ccd ) )
{
ccd->active = 0;
daqd.num_gds_channel_aliases++;
}
else
{
if ( IS_GDS_SIGNAL( *ccd ) )
daqd.num_gds_channels++;
}
// assign conversion data
ccd->signal_gain = params->gain;
ccd->signal_slope = params->slope;
ccd->signal_offset = params->offset;
strncpy(
ccd->signal_units, params->units, channel_t::engr_unit_max_len - 1 );
ccd->signal_units[ channel_t::engr_unit_max_len - 1 ] = 0;
// set DCU rate
extern int default_dcu_rate;
daqd.dcuRate[ ccd->ifoid ][ ccd->dcu_id ] = default_dcu_rate;
// printf("dcu %d rate %d\n", ccd -> dcu_id, default_dcu_rate);
return 1;
void
daqd_c::update_configuration_number( const char* source_address )
if ( _configuration_number != 0 || !source_address )
return;
if ( num_channels == 0 )
return;
FrameCPP::Common::MD5Sum check_sum;
channel_t* cur = channels;
channel_t* end = channels + num_channels;
for ( ; cur < end; ++cur )
{
check_sum.Update( &( cur->chNum ), sizeof( cur->chNum ) );
check_sum.Update( &( cur->seq_num ), sizeof( cur->seq_num ) );
size_t name_len = strnlen( cur->name, channel_t::channel_name_max_len );
check_sum.Update( cur->name, sizeof( name_len ) );
check_sum.Update( &( cur->sample_rate ), sizeof( cur->sample_rate ) );
check_sum.Update( &( cur->active ), sizeof( cur->active ) );
check_sum.Update( &( cur->trend ), sizeof( cur->trend ) );
check_sum.Update( &( cur->group_num ), sizeof( cur->group_num ) );
check_sum.Update( &( cur->bps ), sizeof( cur->bps ) );
check_sum.Update( &( cur->dcu_id ), sizeof( cur->dcu_id ) );
check_sum.Update( &( cur->data_type ), sizeof( cur->data_type ) );
check_sum.Update( &( cur->signal_gain ), sizeof( cur->signal_gain ) );
check_sum.Update( &( cur->signal_slope ), sizeof( cur->signal_slope ) );
check_sum.Update( &( cur->signal_offset ),
sizeof( cur->signal_offset ) );
size_t unit_len =
strnlen( cur->signal_units, channel_t::engr_unit_max_len );
check_sum.Update( cur->signal_units, sizeof( unit_len ) );
}
check_sum.Finalize( );
std::ostringstream ss;
ss << check_sum;
std::string hash = ss.str( );
_configuration_number =
daqd_run_number::get_run_number( source_address, hash );
system_log(
0, "configuration/run number = %d", (int)_configuration_number );
PV::set_pv( PV::PV_CONFIGURATION_NUMBER, _configuration_number );
/// Linear search for a channel group name in channel_groups array.
int
daqd_c::find_channel_group( const char* channel_name )
for ( int i = 0; i < num_channel_groups; i++ )
{
if ( !strncasecmp( channel_groups[ i ].name,
channel_name,
strlen( channel_groups[ i ].name ) ) )
return channel_groups[ i ].num;
}
return 0;
/// Create full resolution frame object.
ldas_frame_h_type
daqd_c::full_frame( int frame_length_seconds,
int science,
adc_data_ptr_type& dptr )
unsigned long nchans = 0;
if ( science )
{
nchans = num_science_channels;
}
else
{
nchans = num_active_channels;
}
FrameCPP::Version::FrAdcData* adc =
new FrameCPP::Version::FrAdcData[ nchans ];
FrameCPP::Version::FrameH::rawData_type rawData =
FrameCPP::Version::FrameH::rawData_type(
new FrameCPP::Version::FrRawData );
ldas_frame_h_type frame;
Keith Thorne
committed
#if USE_LDAS_VERSION
FrameCPP::Version::FrHistory history(
"", 0, "framebuilder, framecpp-" + string( LDAS_VERSION ) );
Keith Thorne
committed
#else
#if USE_FRAMECPP_VERSION
FrameCPP::Version::FrHistory history(
"", 0, "framebuilder, framecpp-" + string( FRAMECPP_VERSION ) );
Keith Thorne
committed
#else
FrameCPP::Version::FrHistory history(
"", 0, "framebuilder, framecpp-unknown" );
Keith Thorne
committed
#endif
#endif
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
FrameCPP::Version::FrDetector detector = daqd.getDetector1( );
// Create frame
//
try
{
frame = ldas_frame_h_type( new FrameCPP::Version::FrameH(
"LIGO",
configuration_number( ), // run number ??? buffpt -r> block_prop
// (nb) -> prop.run;
1, // frame number
FrameCPP::Version::GPSTime( 0, 0 ),
0, // leap seconds
frame_length_seconds // dt
) );
frame->RefDetectProc( ).append( detector );
// Append second detector if it is defined
if ( daqd.detector_name1.length( ) > 0 )
{
FrameCPP::Version::FrDetector detector1 = daqd.getDetector2( );
frame->RefDetectProc( ).append( detector1 );
}
frame->SetRawData( rawData );
frame->RefHistory( ).append( history );
}
catch ( bad_alloc )
{
system_log( 1, "Couldn't create full frame" );
// shutdown_server ();
// return NULL;
abort( );
}
// Create ADCs
try
{
// Fast channels
unsigned int cur_chn = 0;
for ( int i = 0; i < num_channels; i++ )
{
// Skip chanels we don't want to save
if ( science ? 0 == ( channels[ i ].active & 2 )
: !channels[ i ].active )
continue;
FrameCPP::Version::FrAdcData adc = FrameCPP::Version::FrAdcData(
std::string( channels[ i ].name ),
channels[ i ].group_num,
i, // channel ???
CHAR_BIT * channels[ i ].bps,
channels[ i ].sample_rate,
channels[ i ].signal_offset,
channels[ i ].signal_slope,
std::string( channels[ i ].signal_units ),
channels[ i ].data_type == _32bit_complex
? channels[ i ].signal_gain
: .0, /* Freq shift */
0,
0,
.0 ); /* heterodyning phase in radians */
if ( channels[ i ].sample_rate > 16 )
{
/* Append ADC AUX vector to store 16 status words per second */
FrameCPP::Version::Dimension aux_dims[ 1 ] = {
FrameCPP::Version::Dimension(
16 * frame_length_seconds, 1. / 16, "" )
};
FrameCPP::Version::FrVect* aux_vect =
new FrameCPP::Version::FrVect(
"dataValid",
1,
aux_dims,
new INT_2S[ 16 * frame_length_seconds ],
"" );
adc.RefAux( ).append( *aux_vect );
}
/* Append ADC data vector */
INT_4U nx = channels[ i ].sample_rate * frame_length_seconds;
FrameCPP::Version::Dimension dims[ 1 ] = {
FrameCPP::Version::Dimension(
nx, 1. / channels[ i ].sample_rate, "time" )
};
FrameCPP::Version::FrVect* vect;
switch ( channels[ i ].data_type )
{
case _32bit_complex:
{
vect = new FrameCPP::Version::FrVect(
std::string( channels[ i ].name ),
1,
dims,
new COMPLEX_8[ nx ],
std::string( channels[ i ].signal_units ) );
break;
}
case _64bit_double:
{
vect = new FrameCPP::Version::FrVect(
std::string( channels[ i ].name ),
1,
dims,
new REAL_8[ nx ],
std::string( channels[ i ].signal_units ) );
break;
}
case _32bit_float:
{
vect = new FrameCPP::Version::FrVect(
std::string( channels[ i ].name ),
1,
dims,
new REAL_4[ nx ],
std::string( channels[ i ].signal_units ) );
break;
}
case _32bit_integer:
{
vect = new FrameCPP::Version::FrVect(
std::string( channels[ i ].name ),
1,
dims,
new INT_4S[ nx ],
std::string( channels[ i ].signal_units ) );
break;
}
case _32bit_uint:
{
vect = new FrameCPP::Version::FrVect(
std::string( channels[ i ].name ),
1,
dims,
new INT_4U[ nx ],
std::string( channels[ i ].signal_units ) );
break;
}
case _64bit_integer:
{
abort( );
}
default:
{
vect = new FrameCPP::Version::FrVect(
std::string( channels[ i ].name ),
1,
dims,
new INT_2S[ nx ],
channels[ i ].signal_units );
break;
}
}
adc.RefData( ).append( *vect );
frame->GetRawData( )->RefFirstAdc( ).append( adc );
unsigned char* dptr_fast_data = frame->GetRawData( )
->RefFirstAdc( )[ cur_chn ]
->RefData( )[ 0 ]
->GetData( )
.get( );
INT_2U* dptr_aux_data = 0;
if ( channels[ i ].sample_rate > 16 )
{
dptr_aux_data = (INT_2U*)frame->GetRawData( )
->RefFirstAdc( )[ cur_chn ]
->RefAux( )[ 0 ]
->GetData( )
.get( );
}
dptr.push_back( pair< unsigned char*, INT_2U* >( dptr_fast_data,
dptr_aux_data ) );
cur_chn++;
}
}
catch ( bad_alloc )
{
system_log( 1, "Couldn't create ADC channel data" );
// delete frame;
// shutdown_server ();
// return NULL;
abort( );
}
return frame;
/// IO Thread for the full resolution frame saver
/// This is the thread that does the actual writing
daqd_c::framer_io( shared_frame_work_queue_ptr _work_queue, int science )

Jonathan Hanks
committed
const int STATE_NORMAL = 0;
const int STATE_WRITING = 1;
const int STATE_BROADCAST = 2;
bool shmem_bcast_frame = true;
bool dump_bcast_frame = false;
std::string long_lived_debug_frame = "";
shmem_bcast_frame = parameters( ).get< int >( "GDS_BROADCAST", 0 ) == 1;

Jonathan Hanks
committed
dump_bcast_frame =

Jonathan Hanks
committed
parameters( ).get< int >( "DUMP_PERIODIC_BROADCAST_FRAME", 0 ) == 1;
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
if ( science )
{
daqd_c::set_thread_priority( "Science frame saver IO",
"dqscifrio",
SAVER_THREAD_PRIORITY,
SCIENCE_SAVER_IO_CPUAFFINITY );
}
else
{
daqd_c::set_thread_priority( "Frame saver IO",
"dqfulfrio",
SAVER_THREAD_PRIORITY,
FULL_SAVER_IO_CPUAFFINITY );
}
enum PV::PV_NAME epics_state_var =
( science ? PV::PV_SCIENCE_FW_STATE : PV::PV_RAW_FW_STATE );
PV::set_pv( epics_state_var, STATE_NORMAL );
for ( long frame_cntr = 0;; frame_cntr++ )
{
framer_buf* cur_buf = _work_queue->get_from_queue( 1 );
DEBUG( 1,
cerr << "About to write " << ( science ? "science" : "full" )
<< " frame @" << cur_buf->gps << endl );
if ( science )
{
cur_buf->dir_num = science_fsd.getDirFileNames( cur_buf->gps,
cur_buf->_tmpf,
cur_buf->tmpf,
frames_per_file,
blocks_per_frame );
}
else
{
cur_buf->dir_num = fsd.getDirFileNames( cur_buf->gps,
cur_buf->_tmpf,
cur_buf->tmpf,
frames_per_file,
blocks_per_frame );
}
int fd = creat( cur_buf->_tmpf, 0644 );
if ( fd < 0 )
{
system_log(
1,
"Couldn't open full frame file `%s' for writing; errno %d",
cur_buf->_tmpf,
errno );
if ( science )
{
science_fsd.report_lost_frame( );
}
else
{
fsd.report_lost_frame( );
}
set_fault( );
}
else
{
close( fd );
/*try*/
{
PV::set_pv( epics_state_var, STATE_WRITING );
time_t t = 0;
{
FrameCPP::Common::MD5SumFilter md5filter;
FrameCPP::Common::FrameBuffer< filebuf >* obuf =
new FrameCPP::Common::FrameBuffer< std::filebuf >(
std::ios::out );
obuf->open( cur_buf->_tmpf,
std::ios::out | std::ios::binary );
obuf->FilterAdd( &md5filter );
FrameCPP::Common::OFrameStream ofs( obuf );
ofs.SetCheckSumFile( FrameCPP::Common::CheckSum::CRC );
DEBUG( 1, cerr << "Begin WriteFrame()" << endl );
t = time( 0 );
ofs.WriteFrame(
cur_buf->frame,
// FrameCPP::Version::FrVect::GZIP, 1,
daqd.no_compression
? FrameCPP::FrVect::RAW
: FrameCPP::FrVect::ZERO_SUPPRESS_OTHERWISE_GZIP,
1,
// FrameCPP::Compression::MODE_ZERO_SUPPRESS_SHORT,
// FrameCPP::Version::FrVect::DEFAULT_GZIP_LEVEL, /* 6
// */
FrameCPP::Common::CheckSum::CRC );
ofs.Close( );
obuf->close( );
md5filter.Finalize( );
queue_frame_checksum(
cur_buf->tmpf,
( science ? daqd_c::science_frame : daqd_c::raw_frame ),
md5filter );
PV::set_pv( ( science ? PV::PV_SCIENCE_FRAME_CHECK_SUM_TRUNC
: PV::PV_FRAME_CHECK_SUM_TRUNC ),
*reinterpret_cast< const unsigned int* >(
md5filter.Value( ) ) );
}
t = time( 0 ) - t;
PV::set_pv( epics_state_var, STATE_NORMAL );
DEBUG( 1,
cerr << ( science ? "Science" : "Full" )
<< " frame done in " << t << " seconds" << endl );
/* Record frame write time */
if ( science )
{
PV::set_pv( PV::PV_SCIENCE_FRAME_WRITE_SEC, t );
}
else
{
PV::set_pv( PV::PV_FRAME_WRITE_SEC, t );
}
if ( rename( cur_buf->_tmpf, cur_buf->tmpf ) )
{
system_log( 1, "failed to rename file; errno %d", errno );
if ( science )
{
science_fsd.report_lost_frame( );
}
else
{
fsd.report_lost_frame( );
}
set_fault( );
}
else
{
DEBUG( 3,
cerr << "frame " << frame_cntr << "("
<< cur_buf->frame_number << ") is written out"
<< endl );
// Successful frame write
if ( science )
{
science_fsd.update_dir(
cur_buf->gps,
cur_buf->gps_n,
cur_buf->frame_file_length_seconds,
cur_buf->dir_num );
}
else
{
fsd.update_dir( cur_buf->gps,
cur_buf->gps_n,
cur_buf->frame_file_length_seconds,
cur_buf->dir_num );
}
// Report frame size to the Epics world
fd = open( cur_buf->tmpf, O_RDONLY );
if ( fd == -1 )
{
system_log( 1, "failed to open file; errno %d", errno );
exit( 1 );
}
struct stat sb;
if ( fstat( fd, &sb ) == -1 )
{
system_log(
1, "failed to fstat file; errno %d", errno );
exit( 1 );
}
if ( science )
PV::set_pv( PV::PV_SCIENCE_FRAME_SIZE, sb.st_size );
else
PV::set_pv( PV::PV_FRAME_SIZE, sb.st_size );
close( fd );
}
// Update the EPICS_SAVED value
if ( !science )
{
PV::set_pv( PV::PV_CHANS_SAVED, cur_buf->nac );
}
else
{
PV::set_pv( PV::PV_SCIENCE_CHANS_SAVED, cur_buf->nac );
}
if ( shmem_bcast_frame )
{
// We are compiled to be a DMT broadcaster
//
fd = open( cur_buf->tmpf, O_RDONLY );
if ( fd == -1 )
{
system_log( 1, "failed to open file; errno %d", errno );
exit( 1 );
}
struct stat sb;
if ( fstat( fd, &sb ) == -1 )
{
system_log(
1, "failed to fstat file; errno %d", errno );
exit( 1 );
}
void* addr =