diff --git a/src/daqd/CMakeLists.txt b/src/daqd/CMakeLists.txt index 9d72ab6f54a683267076342dd8fbf428fe19dde9..bac4bb6955f6955b8daaf3d5f9febca8e7d20946 100644 --- a/src/daqd/CMakeLists.txt +++ b/src/daqd/CMakeLists.txt @@ -113,7 +113,7 @@ add_executable(daqd daqd.cc comm_impl.cc checksum_crc32.cc - thread_launcher.cc + daqd_thread.cc ../include/daq_core.h ${CMAKE_CURRENT_BINARY_DIR}/comm.cc ${CMAKE_CURRENT_BINARY_DIR}/comm-lex.cc @@ -150,8 +150,8 @@ install(TARGETS daqd DESTINATION bin) add_executable(test_daqd_unit_tests tests/test_main.cc tests/test_daqd_cmask_t.cc tests/test_shmem_recv.cc - tests/test_thread_launcher.cc - thread_launcher.cc + tests/test_daqd_thread.cc + daqd_thread.cc ) target_include_directories(test_daqd_unit_tests PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ../include) target_link_libraries(test_daqd_unit_tests PUBLIC ${CMAKE_THREAD_LIBS_INIT}) diff --git a/src/daqd/circ.cc b/src/daqd/circ.cc index 460fb3b1b8b0907f2645a2a4f235a8968ea60ed4..78e1a473a5d82e6737bc94aa3ad4143f6bd9f9ac 100644 --- a/src/daqd/circ.cc +++ b/src/daqd/circ.cc @@ -15,6 +15,7 @@ using namespace std; #include <errno.h> #include <stdio.h> #include "circ.hh" +#include "raii.hh" #ifndef SCOPE_PROGRAM #include "daqd.hh" @@ -79,8 +80,8 @@ circ_buffer::buffer_malloc( int consumers, } // circ_buffer::circ_buffer (int consumers = 1, int blocks = 100, long -// block_size = 10240, time_t block_period = 1, mem_choice mem_flagp=flag_malloc, -// char *param1 = NULL) +// block_size = 10240, time_t block_period = 1, mem_choice +// mem_flagp=flag_malloc, char *param1 = NULL) circ_buffer::circ_buffer( int consumers, int blocks, long block_size, @@ -323,7 +324,7 @@ circ_buffer::put16th_dpscattered( struct put_dpvec* pv, memor4( dst + pv[ i ].dest_status_idx, &status ); } // *((int *)(dst + pv [i].dest_status_idx) + 1 + nbi16th) = - //status; + // status; memcpy( (char*)( dst + pv[ i ].dest_status_idx + sizeof( int ) * ( 1 + nbi16th ) ), &status, @@ -639,6 +640,50 @@ circ_buffer::get( int cnum ) return nbo; } +/* + Get next data block index for consumer number `cnum'. With a timeout at + absolute time ts. + + This will hang on while the block is not filled. + Must be followed as soon as possible by the call to circ_buffer::unlock(), + or producer will block. + + avi Tue Jan 6 11:05:45 PST 1998 + It seems that the race condition possible in the situation with one slow + consumer one fast consumer and fast producer -- fast producer will make full + circle on the buffer and catches up with the slow consumer; it will decrease + `busy' flag between slow consumer `get' and `unlock' calls. Slow consumer + then either sets `busy' negative or reads data from the buffer while producer + puts new block in. This could be managed by having `busy' flags for every + consumer. + **** fixed by using bit flags in `busy', one for each consumer *** + + returns -1 on timeout. +*/ +int +circ_buffer::timed_get( int cnum, timespec* ts ) +{ + int nbo; + + raii::lock_guard< pthread_mutex_t > l_( + pbuffer->block[ pbuffer->next_block_out[ cnum ] ].lock ); + + assert( invariant( ) ); + + nbo = pbuffer->next_block_out[ cnum ]; + while ( !( pbuffer->block[ nbo ].busy.get( cnum ) ) ) + { + if ( pthread_cond_timedwait( &pbuffer->block[ nbo ].notempty, + &pbuffer->block[ nbo ].lock, + ts ) == ETIMEDOUT ) + { + return -1; + } + } + + return nbo; +} + /* Nonblocking get */ diff --git a/src/daqd/circ.hh b/src/daqd/circ.hh index 08d6fd25a3f62ced1bbd2d2f99c2776cde550342..aa05fa64b5e6d4e74325fa3e8087d262fdcd63a9 100644 --- a/src/daqd/circ.hh +++ b/src/daqd/circ.hh @@ -131,6 +131,7 @@ public: int, circ_buffer_block_prop_t* a = 0 ); int get( int ); + int timed_get( int, timespec* ts); int get_nowait( int ); void unlock( int ); void unlock16th( int ); diff --git a/src/daqd/daqd.cc b/src/daqd/daqd.cc index 207d06c5e5f43169a2676a1993d3821a63e16727..da5a340502fce24b7159fd0c7346a91e0b79cdcb 100644 --- a/src/daqd/daqd.cc +++ b/src/daqd/daqd.cc @@ -55,7 +55,7 @@ using namespace std; #include "daqd.hh" #include "sing_list.hh" #include "net_writer.hh" -#include "thread_launcher.hh" +#include "daqd_thread.hh" #include <stdio.h> #include <stdarg.h> diff --git a/src/daqd/thread_launcher.cc b/src/daqd/daqd_thread.cc similarity index 69% rename from src/daqd/thread_launcher.cc rename to src/daqd/daqd_thread.cc index 15622747a2a9de66411540f8eadd937af5821f76..3fd5bf5e97395850638e83e19c0421da2cc854cd 100644 --- a/src/daqd/thread_launcher.cc +++ b/src/daqd/daqd_thread.cc @@ -1,9 +1,11 @@ // // Created by jonathan.hanks on 3/26/20. // -#include "thread_launcher.hh" +#include "daqd_thread.hh" #include "raii.hh" +#include <algorithm> + namespace { /*! @@ -23,8 +25,8 @@ namespace void* thread_trampoline( void* arg ) { - std::unique_ptr< thread_handler_t > arg_ptr{ - reinterpret_cast< thread_handler_t* >( arg ) + std::unique_ptr< thread_action_t > arg_ptr{ + reinterpret_cast< thread_action_t* >( arg ) }; ( *arg_ptr )( ); return nullptr; @@ -34,10 +36,10 @@ namespace int launch_pthread( pthread_t& tid, const pthread_attr_t& attr, - thread_handler_t handler ) + thread_action_t handler ) { auto arg_ptr = - raii::make_unique_ptr< thread_handler_t >( std::move( handler ) ); + raii::make_unique_ptr< thread_action_t >( std::move( handler ) ); auto result = pthread_create( &tid, &attr, thread_trampoline, @@ -48,4 +50,20 @@ launch_pthread( pthread_t& tid, arg_ptr.release( ); } return result; +} + +thread_handler_t::~thread_handler_t( ) +{ + clear(); +} + +void +thread_handler_t::clear() +{ + stopper_( ); + std::lock_guard< std::mutex > l_{ m_ }; + std::for_each( + thread_ids_.begin( ), thread_ids_.end( ), []( pthread_t& cur_tid ) { + pthread_join( cur_tid, nullptr ); + } ); } \ No newline at end of file diff --git a/src/daqd/daqd_thread.hh b/src/daqd/daqd_thread.hh new file mode 100644 index 0000000000000000000000000000000000000000..f417527677d4347ac8a6170c866725e25d71fb90 --- /dev/null +++ b/src/daqd/daqd_thread.hh @@ -0,0 +1,198 @@ +// +// Created by jonathan.hanks on 3/26/20. +// + +#ifndef DAQD_TRUNK_DAQD_THREAD_HH +#define DAQD_TRUNK_DAQD_THREAD_HH + +#include <functional> +#include <mutex> +#include <vector> + +#include <pthread.h> + +using thread_action_t = std::function< void( void ) >; + +/*! + * @brief A more generic interface around pthread_create + * @details pthread_create has a limited interface, pass a void *, + * std::thread doesn't allow specifying stack size, scheduling, ... + * so provide a wrapper on pthread create that takes a more generic + * argument + * @param tid The pthread_t thread id structure + * @param attr Structure describing pthread attributes to set + * @param handler the code to run on the new thread + */ +extern int launch_pthread( pthread_t& tid, + const pthread_attr_t& attr, + thread_action_t handler ); + +/*! + * @brief wrap the pthread scope macros in a enum to + * bring them into the C++ type system. + */ +enum class thread_scope_t +{ + SYSTEM = PTHREAD_SCOPE_SYSTEM, + PROCESS = PTHREAD_SCOPE_PROCESS, +}; + +/*! + * @brief a strong type for a stacksize. + */ +class thread_stacksize_t +{ +public: + explicit thread_stacksize_t( std::size_t size ) : size_( size ) + { + } + thread_stacksize_t( const thread_stacksize_t& ) = default; + thread_stacksize_t& + operator=( const thread_stacksize_t& ) throw( ) = default; + thread_stacksize_t& + operator=( const std::size_t new_size ) throw( ) + { + size_ = new_size; + return *this; + } + std::size_t + get( ) const + { + return size_; + } + +private: + std::size_t size_; +}; + +/*! + * @brief a RAII wrapper for pthread_attr_t, ensuring that it is always + * destroyed. The constructor also uses strong types to all an arbitrary + * set of attributes to be set at construction time. + */ +class thread_attr_t +{ +public: + thread_attr_t( ) : attr_{} + { + pthread_attr_init( &attr_ ); + } + + /*! + * @brief construct a thread_attr_t and set attributes on it. + * @tparam T one of the attribute types that can be set + * @tparam Args list of additional attribute types + * @param t the first attribute to set + * @param args additional (0 or more) attributes to set + */ + template < typename T, typename... Args > + explicit thread_attr_t( T t, Args... args ) : attr_{} + { + pthread_attr_init( &attr_ ); + set( t, args... ); + } + + thread_attr_t( const thread_attr_t& ) = delete; + thread_attr_t( thread_attr_t&& ) = delete; + + ~thread_attr_t( ) + { + pthread_attr_destroy( &attr_ ); + } + + thread_attr_t& operator=( const thread_attr_t& ) = delete; + thread_attr_t& operator=( thread_attr_t&& ) = delete; + + /*! + * @brief set the PTHREAD_SCOPE value + * @param scope the scope value as an enum + */ + void + set( thread_scope_t scope ) + { + pthread_attr_setscope( &attr_, static_cast< int >( scope ) ); + } + /*! + * @brief set the stack size + * @param stack_size the size of the stack + */ + void + set( thread_stacksize_t stack_size ) + { + pthread_attr_setstacksize( &attr_, stack_size.get( ) ); + } + /*! + * @brief take an arbitrary list of attribute types and set them. + * @tparam T First attribute type to set + * @tparam Args Additional (0+) types of attributes + * @param t the first attribute + * @param args additional attributes + */ + template < typename T, typename... Args > + void + set( T t, Args... args ) + { + set( t ); + set( args... ); + } + + pthread_attr_t& + at( ) + { + return attr_; + } + pthread_attr_t* + get( ) + { + return &attr_; + } + +private: + pthread_attr_t attr_; +}; + +/*! + * @brief a container of threads, that ensure that each thread is joined + * @details The handler is given a stop function which is called (once) to stop + * all the threads that are associated with it. It is the responsibility of the + * caller to ensure that the stop function will indeed stop all the threads that + * will be managed by the thread_handler_t. + */ +class thread_handler_t +{ +public: + using stop_function_t = std::function< void( void ) >; + + explicit thread_handler_t( stop_function_t stopper ) + : stopper_{ std::move( stopper ) } {}; + thread_handler_t( const thread_handler_t& ) = delete; + thread_handler_t( thread_handler_t&& ) = delete; + thread_handler_t& operator=( const thread_handler_t ) = delete; + thread_handler_t& operator=( thread_handler_t&& ) = delete; + + ~thread_handler_t( ); + + void + push_back( thread_action_t action, thread_attr_t& attr ) + { + std::lock_guard< std::mutex > l_{ m_ }; + std::vector< pthread_t > ids( thread_ids_.begin( ), + thread_ids_.end( ) ); + ids.emplace_back( ); + if ( launch_pthread( ids.back( ), attr.at( ), std::move( action ) ) != + 0 ) + { + throw std::runtime_error( "Unable to create thread" ); + } + thread_ids_.swap( ids ); + } + + void clear( ); + +private: + std::vector< pthread_t > thread_ids_{}; + std::mutex m_{}; + stop_function_t stopper_; +}; + +#endif // DAQD_TRUNK_DAQD_THREAD_HH diff --git a/src/daqd/producer_shmem.cc b/src/daqd/producer_shmem.cc index 2e75e97d12e4bffa34760e2d37b4bc177f7d195a..28989d0094a6126c3a985e27d64b012036c48f90 100644 --- a/src/daqd/producer_shmem.cc +++ b/src/daqd/producer_shmem.cc @@ -60,7 +60,7 @@ using namespace std; #include "circ.h" #include "daq_core.h" #include "shmem_receiver.hh" -#include "thread_launcher.hh" +#include "daqd_thread.hh" extern daqd_c daqd; extern int shutdown_server( ); diff --git a/src/daqd/tests/test_thread_launcher.cc b/src/daqd/tests/test_daqd_thread.cc similarity index 95% rename from src/daqd/tests/test_thread_launcher.cc rename to src/daqd/tests/test_daqd_thread.cc index 6764cb66d3444581c2bf6eef71a715c496020b52..14707ab33a6b3d7d06ca6d7f50805b56d1611b9f 100644 --- a/src/daqd/tests/test_thread_launcher.cc +++ b/src/daqd/tests/test_daqd_thread.cc @@ -4,7 +4,7 @@ #include <chrono> #include <thread> -#include "thread_launcher.hh" +#include "daqd_thread.hh" TEST_CASE( "You can launch a pthread with the launch_pthread call" ) { diff --git a/src/daqd/thread_launcher.hh b/src/daqd/thread_launcher.hh deleted file mode 100644 index 73093618f335501a29d535911648b556dd3c7478..0000000000000000000000000000000000000000 --- a/src/daqd/thread_launcher.hh +++ /dev/null @@ -1,28 +0,0 @@ -// -// Created by jonathan.hanks on 3/26/20. -// - -#ifndef DAQD_TRUNK_THREAD_LAUNCHER_HH -#define DAQD_TRUNK_THREAD_LAUNCHER_HH - -#include <functional> - -#include <pthread.h> - -using thread_handler_t = std::function< void( void ) >; - -/*! - * @brief A more generic interface around pthread_create - * @details pthread_create has a limited interface, pass a void *, - * std::thread doesn't allow specifying stack size, scheduling, ... - * so provide a wrapper on pthread create that takes a more generic - * argument - * @param tid The pthread_t thread id structure - * @param attr Structure describing pthread attributes to set - * @param handler the code to run on the new thread - */ -extern int launch_pthread( pthread_t& tid, - const pthread_attr_t& attr, - thread_handler_t handler ); - -#endif // DAQD_TRUNK_THREAD_LAUNCHER_HH diff --git a/src/daqd/trend.cc b/src/daqd/trend.cc index 83b37a588db7df1970ede913daa39d7cb2758683..30563f05859537f08f198f9f6c80f14ba27d18ca 100644 --- a/src/daqd/trend.cc +++ b/src/daqd/trend.cc @@ -63,11 +63,16 @@ trender_c::raw_minute_saver( ) for ( int i = 0; i < rmp; i++ ) cur_blk[ i ] = new trend_block_t[ num_channels ]; - for ( minute_put_cntr = 0;; minute_put_cntr++ ) + for ( minute_put_cntr = 0; !stopping( ); minute_put_cntr++ ) { int eof_flag = 0; - int nb = mtb->get( raw_msaver_cnum ); + int nb = time_get_helper( *mtb, raw_msaver_cnum ); + if ( nb < 0 ) + { + return nullptr; + } + DEBUG( 3, cerr << "raw minute trender saver; block " << nb << endl ); { cur_prop[ minute_put_cntr % rmp ] = mtb->block_prop( nb )->prop; @@ -224,7 +229,7 @@ trender_c::raw_minute_saver( ) } // signal to the producer that we are done - this->shutdown_minute_trender( ); + this->shutdown_trender( ); return NULL; } @@ -282,7 +287,7 @@ trender_c::minute_framer( ) catch ( ... ) { system_log( 1, "Couldn't create minute trend frame" ); - this->shutdown_minute_trender( ); + this->shutdown_trender( ); return NULL; } @@ -376,7 +381,7 @@ trender_c::minute_framer( ) PV::set_pv( PV::PV_MTREND_FW_STATE, STATE_NORMAL ); long frame_cntr; - for ( frame_cntr = 0;; frame_cntr++ ) + for ( frame_cntr = 0; !stopping( ); frame_cntr++ ) { int eof_flag = 0; circ_buffer_block_prop_t file_prop; @@ -386,7 +391,13 @@ trender_c::minute_framer( ) // Accumulate frame adc data for ( int i = 0; i < frame_length_blocks; i++ ) { - int nb = mtb->get( msaver_cnum ); + int nb = time_get_helper( *mtb, msaver_cnum ); + if ( nb < 0 ) + { + eof_flag = 1; + break; + } + DEBUG( 3, cerr << "minute trender saver; block " << nb << endl ); { if ( !mtb->block_prop( nb )->bytes ) @@ -614,7 +625,7 @@ trender_c::minute_framer( ) } // signal to the producer that we are done - this->shutdown_minute_trender( ); + this->shutdown_trender( ); return NULL; } @@ -640,20 +651,16 @@ trender_c::minute_trend( ) trend_block_t ttb[ num_channels ]; // minute trend local storage unsigned long npoints[ num_channels ]; // number of data points processed - for ( nc = 0;; ) + for ( nc = 0; !stopping( ); ) { circ_buffer_block_prop_t prop; - // Request to shut us down received from consumer - // - if ( shutdown_minute_now ) + nb = time_get_helper( *tb, mcnum ); + if ( nb < 0 ) { - // FIXME -- synchronize with the demise of the trender - // shutdown_buffer (); - continue; + return nullptr; } - nb = tb->get( mcnum ); { trend_block_t* btr = (trend_block_t*)tb->block_ptr( nb ); int bytes; @@ -770,9 +777,14 @@ trender_c::saver( ) { int nb; - for ( ;; ) + for ( ; !stopping( ); ) { - nb = tb->get( saver_cnum ); + nb = time_get_helper( *tb, saver_cnum ); + if ( nb < 0 ) + { + return nullptr; + } + { if ( !tb->block_prop( nb )->bytes ) break; @@ -938,7 +950,7 @@ trender_c::framer( ) PV::set_pv( PV::PV_STREND_FW_STATE, STATE_NORMAL ); long frame_cntr; - for ( frame_cntr = 0;; frame_cntr++ ) + for ( frame_cntr = 0; !stopping( ); frame_cntr++ ) { int eof_flag = 0; circ_buffer_block_prop_t file_prop; @@ -948,7 +960,13 @@ trender_c::framer( ) // Accumulate frame adc data for ( int i = 0; i < frame_length_blocks; i++ ) { - int nb = tb->get( saver_cnum ); + int nb = time_get_helper( *tb, saver_cnum ); + if ( nb < 0 ) + { + eof_flag = 1; + break; + } + DEBUG( 3, cerr << "second trender saver; block " << nb << endl ); { if ( !tb->block_prop( nb )->bytes ) @@ -1438,54 +1456,18 @@ trender_c::trend( ) circ_buffer* ltb = this->tb; sem_post( &trender_sem ); -#ifdef not_def - // Find out how to split the work with worker thread - { - unsigned long trend_load_size = 0; - // Calculate summary trender data load - for ( unsigned int i = 0; i < num_channels; i++ ) - trend_load_size += channels[ i ].bytes; - - system_log( 1, "Trend's work load is %d bytes", trend_load_size ); - unsigned long bsize = trend_load_size / 2; - unsigned long load_size = 0; - for ( worker_first_channel = 0; worker_first_channel < num_channels; - worker_first_channel++ ) - { - load_size += channels[ worker_first_channel ].bytes; - if ( load_size > bsize ) - break; - } - system_log( 1, - "Trend Worker's first channel is #%d (%s)", - worker_first_channel, - channels[ worker_first_channel ].name ); - } -#endif - - for ( ;; ) + for ( ; !stopping( ); ) { circ_buffer_block_prop_t prop; -#ifdef not_def - pthread_mutex_lock( &lock ); - ltb = this->tb; - pthread_mutex_unlock( &lock ); - - if ( ltb ) /* There is a trend buffer, so we need to process data */ -#endif - { - // Request to shut us down received from consumer - // - if ( shutdown_now ) + + nb = time_get_helper( *daqd.b1, cnum ); + if ( nb < 0 ) { - // FIXME -- synchronize with the demise of the minute trender - shutdown_buffer( ); - continue; + return nullptr; } - nb = daqd.b1->get( cnum ); { char* block_ptr = daqd.b1->block_ptr( nb ); unsigned long block_bytes = daqd.b1->block_prop( nb )->bytes; @@ -1557,21 +1539,9 @@ trender_c::trend( ) DEBUG( 3, cerr << "trender consumer " << prop.gps << endl ); -#ifdef not_def - for ( register int j = 0; j < num_channels; j++ ) - { - ttb[ j ].min.F = ttb[ j ].max.F = j; - ttb[ j ].rms = j; - } -#endif ltb->put( (char*)ttb, block_size, &prop ); } -#ifdef not_def - else /* Do stuff required for the synchronization */ - daqd.b1->noop( cnum ); -#endif } - return NULL; } @@ -1582,14 +1552,23 @@ trender_c::trend_worker( ) daqd_c::set_thread_priority( "Second trend worker", "dqstrwk", 0, SECOND_TREND_WORKER_CPUAFFINITY ); - for ( ;; ) + for ( ; !stopping( ); ) { // get control from the trender thread pthread_mutex_lock( &worker_lock ); - while ( !worker_busy ) - pthread_cond_wait( &worker_notempty, &worker_lock ); + while ( !worker_busy || !stopping( ) ) + { + timespec ts{}; + timespec_get( &ts, TIME_UTC ); + ts.tv_sec++; + pthread_cond_timedwait( &worker_notempty, &worker_lock, &ts ); + } int nb = trend_worker_nb; pthread_mutex_unlock( &worker_lock ); + if ( stopping( ) ) + { + break; + } char* block_ptr = daqd.b1->block_ptr( nb ); @@ -1772,22 +1751,15 @@ trender_c::start_trend( ostream* yyout, // Start minute trender if ( ( mcnum = tb->add_consumer( ) ) >= 0 ) { - pthread_attr_t attr; - pthread_attr_init( &attr ); - pthread_attr_setstacksize( &attr, daqd.thread_stack_size ); - pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM ); - pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); - int err_no; - if ( err_no = pthread_create( &mconsumer, - &attr, - (void* (*)(void*))minute_trend_static, - (void*)this ) ) + thread_attr_t attr( thread_scope_t::SYSTEM, + thread_stacksize_t( daqd.thread_stack_size ) ); + try { - strerror_r( err_no, errmsgbuf, sizeof( errmsgbuf ) ); - system_log( 1, - "couldn't create minute trend consumer thread; " - "pthread_create() err=%s", - errmsgbuf ); + threads_.push_back( [this]( ) { minute_trend( ); }, attr ); + } + catch ( ... ) + { + system_log( 1, "couldn't create minute trend consumer thread; " ); tb->delete_consumer( mcnum ); mcnum = 0; tb->~circ_buffer( ); @@ -1796,13 +1768,11 @@ trender_c::start_trend( ostream* yyout, mtb->~circ_buffer( ); free( (void*)mtb ); mtb = 0; - pthread_attr_destroy( &attr ); + return 1; } - pthread_attr_destroy( &attr ); - DEBUG( 2, - cerr << "minute trend consumer created; tid=" << mconsumer - << endl ); + + DEBUG( 2, cerr << "minute trend consumer created;" << endl ); } else { @@ -1821,36 +1791,25 @@ trender_c::start_trend( ostream* yyout, { sem_wait( &trender_sem ); - pthread_attr_t attr; - pthread_attr_init( &attr ); - pthread_attr_setstacksize( &attr, daqd.thread_stack_size ); - pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM ); - pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); - int err_no; - - if ( err_no = pthread_create( &worker_tid, - &attr, - (void* (*)(void*))trend_worker_static, - (void*)this ) ) + thread_attr_t attr( thread_stacksize_t( daqd.thread_stack_size ), + thread_scope_t::SYSTEM ); + try { - strerror_r( err_no, errmsgbuf, sizeof( errmsgbuf ) ); - system_log( 1, - "couldn't create second trend worker thread; " - "pthread_create() err=%s", - errmsgbuf ); + threads_.push_back( [this]( ) { trend_worker( ); }, attr ); + } + catch ( ... ) + { + system_log( 1, "couldn't create second trend worker thread;" ); abort( ); } - if ( err_no = pthread_create( &consumer, - &attr, - (void* (*)(void*))trend_static, - (void*)this ) ) + try { - strerror_r( err_no, errmsgbuf, sizeof( errmsgbuf ) ); - system_log( 1, - "couldn't create second trend consumer thread; " - "pthread_create() err=%s", - errmsgbuf ); + threads_.push_back( [this]( ) { trend( ); }, attr ); + } + catch ( ... ) + { + system_log( 1, "couldn't create second trend consumer thread; " ); // FIXME: have to cancel minute trend thread here first !!! abort( ); @@ -1863,13 +1822,10 @@ trender_c::start_trend( ostream* yyout, mtb->~circ_buffer( ); free( (void*)mtb ); mtb = 0; - pthread_attr_destroy( &attr ); return 1; } - pthread_attr_destroy( &attr ); - DEBUG( 2, - cerr << "second trend consumer created; tid=" << consumer - << endl ); + + DEBUG( 2, cerr << "second trend consumer created;" << endl ); } else { @@ -1909,41 +1865,27 @@ trender_c::start_raw_minute_trend_saver( ostream* yyout ) << endl; return 1; } - // error message buffer - char errmsgbuf[ 80 ]; - pthread_attr_t attr; - pthread_attr_init( &attr ); - pthread_attr_setstacksize( &attr, daqd.thread_stack_size ); - pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM ); - pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); + thread_attr_t attr( thread_stacksize_t( daqd.thread_stack_size ), + thread_scope_t::SYSTEM ); /* Start raw minute trend file saver thread */ - int err_no; - if ( err_no = pthread_create( - &mtraw, - &attr, - (void* (*)(void*))daqd.trender.raw_minute_trend_saver_static, - (void*)this ) ) + try { - strerror_r( err_no, errmsgbuf, sizeof( errmsgbuf ) ); - system_log( 1, - "couldn't create raw minute trend saver thread; " - "pthread_create() err=%s", - errmsgbuf ); + threads_.push_back( [this]( ) { raw_minute_saver( ); }, attr ); + } + catch ( ... ) + { + system_log( 1, "couldn't create raw minute trend saver thread; " ); // FIXME: have to cancel frame saver thread here abort( ); mtb->delete_consumer( raw_msaver_cnum ); raw_msaver_cnum = 0; - pthread_attr_destroy( &attr ); return 1; } - pthread_attr_destroy( &attr ); - DEBUG( 2, - cerr << "raw minute trend saver thread started; tid=" << mtraw - << endl ); + DEBUG( 2, cerr << "raw minute trend saver thread started;" << endl ); return 0; } @@ -1968,42 +1910,26 @@ trender_c::start_minute_trend_saver( ostream* yyout ) return 1; } - // error message buffer - char errmsgbuf[ 80 ]; - sem_wait( &minute_frame_saver_sem ); - pthread_attr_t attr; - pthread_attr_init( &attr ); - pthread_attr_setstacksize( &attr, daqd.thread_stack_size ); - pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM ); - pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); + thread_attr_t attr( thread_stacksize_t( daqd.thread_stack_size ), + thread_scope_t::SYSTEM ); /* Start minute trend frame saver thread */ - int err_no; - if ( err_no = pthread_create( - &mtsaver, - &attr, - (void* (*)(void*))daqd.trender.minute_trend_framer_static, - (void*)this ) ) + try { - strerror_r( err_no, errmsgbuf, sizeof( errmsgbuf ) ); - system_log( 1, - "couldn't create minute trend framer thread; " - "pthread_create() err=%s", - errmsgbuf ); - + threads_.push_back( [this]( ) { minute_framer( ); }, attr ); + } + catch ( ... ) + { + system_log( 1, "couldn't create minute trend framer thread; " ); // FIXME: have to cancel frame saver thread here abort( ); mtb->delete_consumer( msaver_cnum ); msaver_cnum = 0; - pthread_attr_destroy( &attr ); return 1; } - pthread_attr_destroy( &attr ); - DEBUG( 2, - cerr << "minute trend framer thread started; tid=" << mtsaver - << endl ); + DEBUG( 2, cerr << "minute trend framer thread started;" << endl ); } return 0; @@ -2028,14 +1954,8 @@ trender_c::start_trend_saver( ostream* yyout ) return 1; } - // error message buffer - char errmsgbuf[ 80 ]; - - pthread_attr_t attr; - pthread_attr_init( &attr ); - pthread_attr_setstacksize( &attr, daqd.thread_stack_size ); - pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM ); - pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); + thread_attr_t attr( thread_stacksize_t( daqd.thread_stack_size ), + thread_scope_t::SYSTEM ); if ( ascii_output ) { @@ -2047,52 +1967,34 @@ trender_c::start_trend_saver( ostream* yyout ) } /* Start trend saver consumer thread */ - int err_no; - if ( err_no = - pthread_create( &tsaver, - &attr, - (void* (*)(void*))daqd.trender.saver_static, - (void*)this ) ) + try { - strerror_r( err_no, errmsgbuf, sizeof( errmsgbuf ) ); - system_log( 1, - "couldn't create ascii trend saver thread; " - "pthread_create() err=%s", - errmsgbuf ); + threads_.push_back( [this]( ) { saver( ); }, attr ); + } + catch ( ... ) + { + system_log( 1, "couldn't create ascii trend saver thread;" ); tb->delete_consumer( saver_cnum ); - pthread_attr_destroy( &attr ); return 1; } - pthread_attr_destroy( &attr ); - DEBUG( 2, - cerr << "ASCII trend saver thread started; tid=" << tsaver - << endl ); + DEBUG( 2, cerr << "ASCII trend saver thread started;" << endl ); } else { sem_wait( &frame_saver_sem ); /* Start second trend framer thread */ - int err_no; - if ( err_no = pthread_create( - &tsaver, - &attr, - (void* (*)(void*))daqd.trender.trend_framer_static, - (void*)this ) ) + try { - strerror_r( err_no, errmsgbuf, sizeof( errmsgbuf ) ); - system_log( 1, - "couldn't create second trend framer thread; " - "pthread_create() err=%s", - errmsgbuf ); + threads_.push_back( [this]( ) { framer( ); }, attr ); + } + catch ( ... ) + { + system_log( 1, "couldn't create second trend framer thread;" ); tb->delete_consumer( saver_cnum ); - pthread_attr_destroy( &attr ); return 1; } - pthread_attr_destroy( &attr ); - DEBUG( 2, - cerr << "second trend framer thread started; tid=" << tsaver - << endl ); + DEBUG( 2, cerr << "second trend framer thread started;" << endl ); } return 0; diff --git a/src/daqd/trend.hh b/src/daqd/trend.hh index d5ac88be4c6e82389da836a68e5b6708541db055..f828368b321e3583047f35ad6030d8e642f7b6dd 100644 --- a/src/daqd/trend.hh +++ b/src/daqd/trend.hh @@ -4,6 +4,8 @@ #include "daqd.hh" #include "profiler.hh" #include "stats/stats.hh" +#include "daqd_thread.hh" +#include <atomic> /// Trend circular buffer block structure /// Do not put any shorts in this structure, because compiler puts holes in to @@ -83,6 +85,28 @@ public: ~trender_c( ) { + // stop the threads before the rest of the cleanup. + threads_.clear( ); + + { + circ_buffer* oldb; + + if ( tb ) + { + oldb = tb; + tb = 0; + oldb->~circ_buffer( ); + free( (void*)oldb ); + } + + if ( mtb ) + { + oldb = mtb; + mtb = 0; + oldb->~circ_buffer( ); + free( (void*)oldb ); + } + } pthread_cond_destroy( &worker_done ); pthread_cond_destroy( &worker_notempty ); pthread_mutex_destroy( &worker_lock ); @@ -94,7 +118,7 @@ public: } trender_c( ) - : shutdown_now( 0 ), shutdown_minute_now( 0 ), + : threads_{ [this]( ) { shutdown_trender( ); } }, minute_trend_buffer_blocks( 60 ), mtb( 0 ), mt_stats( ), mt_file_stats( ), tb( 0 ), num_channels( 0 ), num_trend_channels( 0 ), block_size( 0 ), ascii_output( 0 ), frames_per_file( 1 ), @@ -170,15 +194,19 @@ public: circ_buffer* mtb; ///< minute trend circular buffer object class stats mt_stats; ///< minute trend period stats class stats mt_file_stats; ///< minute trend file saving stats - pthread_t tsaver; ///< This thread saves trend data into the `fname' - pthread_t mtsaver; ///< This thread saves minute trend data into the `fname' - pthread_t mtraw; ///< This thread saves minute trend data to raw minute - ///< trend files - pthread_t consumer; ///< Thread reads data from the main circular buffer, - ///< calculates trend and puts it into `tb' - pthread_t - mconsumer; ///< Thread reads data from the trend circular buffer, `tb', - ///< calculates minute trend and puts it into `mtb' + + /*! + * manage the threads + * mconsumer - Thread reads data from the trend circular buffer, `tb', + * calculates minute trend and puts it into `mtb' + * worker_tid + * consumer - Thread reads data from the main circular buffer, calculates + * trend and puts it into `tb' + * mtraw - This thread saves minute trend data to raw minute trend files + * mtsaver - This thread saves minute trend data into the `fname' + * tsaver - This thread saves trend data into the `fname' + */ + thread_handler_t threads_; int ascii_output; ///< If set, no frame files, just plain ascii trend file ///< is created ofstream* fout; @@ -250,42 +278,14 @@ public: return ( (trender_c*)a )->trend( ); }; void* trend_worker( ); - static void* - trend_worker_static( void* a ) - { - return ( (trender_c*)a )->trend_worker( ); - }; + void* saver( ); - static void* - saver_static( void* a ) - { - return ( (trender_c*)a )->saver( ); - }; void* framer( ); - static void* - trend_framer_static( void* a ) - { - return ( (trender_c*)a )->framer( ); - }; void* minute_trend( ); - static void* - minute_trend_static( void* a ) - { - return ( (trender_c*)a )->minute_trend( ); - }; + void* minute_framer( ); void* raw_minute_saver( ); - static void* - minute_trend_framer_static( void* a ) - { - return ( (trender_c*)a )->minute_framer( ); - }; - static void* - raw_minute_trend_saver_static( void* a ) - { - return ( (trender_c*)a )->raw_minute_saver( ); - }; profile_c profile; ///< profile on trend circular buffer. profile_c profile_mt; ///< profile on minute trend circular buffer. @@ -307,8 +307,6 @@ public: pthread_cond_t worker_done; unsigned int worker_busy; ///< worker clears it when it is finished - pthread_t worker_tid; - /// worker will start processing channels in the direction from the end to /// the start trender will do from the start to the end. They will meet at /// some point, when next_trender_block >= next_worker_block. @@ -333,36 +331,44 @@ public: } private: - int _configuration_number; - int shutdown_now; - int shutdown_minute_now; + int _configuration_number; + std::atomic_bool shutdown_now_{ false }; void shutdown_trender( ) { - shutdown_now = 1; + shutdown_now_ = false; } - void - shutdown_minute_trender( ) + + bool + stopping( ) const { - shutdown_minute_now = 1; + return shutdown_now_.load( ); } - /// kill the buffer and close the socket - void - shutdown_buffer( ) - { - circ_buffer* oldb; - - oldb = tb; - tb = 0; - oldb->~circ_buffer( ); - free( (void*)oldb ); - oldb = mtb; - mtb = 0; - oldb->~circ_buffer( ); - free( (void*)oldb ); - - shutdown_now = 0; + /*! + * @brief helper to do a get from a buffer that can time out if stopping() + * @param cbuffer buffer to check + * @param consumer_number + * @return -1 if stopping() else the next buffer number + * @note on -1 it calls cbuffer.unlock() + */ + int + time_get_helper( circ_buffer& cbuffer, int consumer_number ) + { + int nb = -1; + do + { + timespec ts{}; + timespec_get( &ts, TIME_UTC ); + ts.tv_sec++; + nb = cbuffer.timed_get( consumer_number, &ts ); + if ( stopping( ) ) + { + cbuffer.unlock( consumer_number ); + return -1; + } + } while ( nb < 0 ); + return nb; } }; // class trender_c