diff --git a/CMakeLists.txt b/CMakeLists.txt index 4febffa5f868145e0fd8e1e143ff822a6c57e2c0..0525af94572fca5b6a4eb131a7e36d48ba4cde7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,7 @@ FIND_PACKAGE(RapidJSON) FIND_PACKAGE(RPC) FIND_PACKAGE(libcds-pubsub) FIND_PACKAGE(Python3) +find_package(pybind11) CHECK_CXX_SOURCE_COMPILES("#include <iostream> #include <FlexLexer.h> diff --git a/src/daqd/CMakeLists.txt b/src/daqd/CMakeLists.txt index a0242c612da7553328bc74e92283d83d70f259a9..b3d3c1b6374afef224e4f94a4d32730c8a22dac0 100644 --- a/src/daqd/CMakeLists.txt +++ b/src/daqd/CMakeLists.txt @@ -177,6 +177,7 @@ target_include_directories(test_daqd_unit_tests PUBLIC target_link_libraries(test_daqd_unit_tests PUBLIC catch2 ldastools::framecpp + driver::shmem ${CMAKE_THREAD_LIBS_INIT}) add_test(NAME test_daqd_unit_tests COMMAND test_daqd_unit_tests @@ -226,6 +227,14 @@ if (libNDS2Client_FOUND) COMMAND /bin/bash ./test_daqd_live_nds.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + configure_file(tests/daqdrc_live_posix_shmem_test ${CMAKE_CURRENT_BINARY_DIR}/daqdrc_live_posix_shmem_test COPYONLY) + configure_file(tests/test_daqd_live_nds_posix_shmem.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test_daqd_live_nds_posix_shmem.sh @ONLY) + + add_test(NAME test_daqd_live_nds_posix_shmem + COMMAND /bin/bash ./test_daqd_live_nds_posix_shmem.sh + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + + configure_file(tests/daqdrc_nds_test ${CMAKE_CURRENT_BINARY_DIR}/daqdrc_nds_test COPYONLY) configure_file(tests/test_daqd_nds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test_daqd_nds.sh @ONLY) diff --git a/src/daqd/shmem_receiver.hh b/src/daqd/shmem_receiver.hh index 4043156e3d8c20bbd7bd00c82695f0bccb7fcd21..f9b4821914b9ef1d434cbc8e379777ed7683a2b8 100644 --- a/src/daqd/shmem_receiver.hh +++ b/src/daqd/shmem_receiver.hh @@ -18,14 +18,26 @@ class ShMemReceiver { + static std::size_t + restrict_size( std::size_t s ) + { + const std::size_t MB = 1024 * 1024; + if ( s % MB != 0 ) + { + throw std::invalid_argument( + "shared memory buffer sizes must be multiples of 2**20" ); + } + return s / MB; + } + public: ShMemReceiver( ) = delete; ShMemReceiver( ShMemReceiver& other ) = delete; ShMemReceiver( const std::string& endpoint, std::size_t shmem_size, std::function< void( void ) > signal = []( ) {} ) - : shmem_( static_cast< volatile daq_multi_cycle_data_t* >( - shmem_open_segment( endpoint.c_str( ), shmem_size ) ) ), + : shmem_obj_( endpoint.c_str( ), restrict_size( shmem_size ) ), + shmem_( shmem_obj_.mapping< daq_multi_cycle_data_t >( ) ), shmem_size_( shmem_size ), signal_stalled_{ std::move( signal ) } { } @@ -33,7 +45,8 @@ public: ShMemReceiver( volatile daq_multi_cycle_data_t* shmem_area, std::size_t shmem_size, std::function< void( void ) > signal = []( ) {} ) - : shmem_( shmem_area ), + : shmem_obj_( shmem_area, shmem_size, nullptr ), + shmem_( shmem_obj_.mapping< daq_multi_cycle_data_t >( ) ), shmem_size_( shmem_size ), signal_stalled_{ std::move( signal ) } { } @@ -192,6 +205,7 @@ private: std::this_thread::sleep_for( std::chrono::milliseconds( 3 ) ); } + shmem::shmem shmem_obj_; volatile daq_multi_cycle_data_t* shmem_; std::size_t shmem_size_; diff --git a/src/daqd/tests/daqdrc_live_posix_shmem_test b/src/daqd/tests/daqdrc_live_posix_shmem_test new file mode 100644 index 0000000000000000000000000000000000000000..d7fa72adc180401521d0728d1aba4a1865ded4e4 --- /dev/null +++ b/src/daqd/tests/daqdrc_live_posix_shmem_test @@ -0,0 +1,82 @@ +set thread_stack_size=10240; +#set cit_40m=1; +set dcu_status_check=1; +#set symm_gps_offset=-1; +#set controller_dcu=22; +set debug=0; +set log=6; +set zero_bad_data=0; +set master_config="MASTER"; +configure channels begin end; +#tpconfig "TESTPOINT"; + +status dcu; +#tpconfig "TESTPOINT"; + +set gps_leaps = 820108813; +set detector_name="TST"; +set detector_prefix="X6"; +set detector_longitude=-90.7742403889; +set detector_latitude=30.5628943337; +set detector_elevation=.0; +set detector_azimuths=1.1,4.7123889804; +set detector_altitudes=1.0,2.0; +set detector_midpoints=2000.0, 2000.0; + +#enable frame_wiper; +#set num_dirs = 10; +#set frames_per_dir=225; +#set full_frames_per_file=1; +#set full_frames_blocks_per_frame=32; +#set frame_dir="/frames/full", "M-R-", ".gwf"; +#scan frames; + +#enable trend_frame_wiper; +#set trend_num_dirs=10; +#set trend_frames_per_dir=1440; +#set trend_frame_dir= "/frames/trend/second", "M-T-", ".gwf"; + +#set raw-minute-trend-dir="/frames/trend/minute/raw"; + +#set nds-jobs-dir="/opt/fb"; + +set parameter "shmem_input" = "shm://local_dc"; +set parameter "shmem_size" = "104857600"; + + +#enable minute-trend-frame-wiper; +#set minute-trend-num-dirs=10; +#set minute-trend-frames-per-dir=24; +#set minute-trend-frame-dir="/frames/trend/minute", "M-M-", ".gwf"; +# +#scan minute-trend-frames; +#scan trend-frames; +scan frames; + +start main 5; +start profiler; + +# comment out this block to stop saving data + +#start frame-saver; +#sync frame-saver; +start trender; +#start trend-frame-saver; +#sync trend-frame-saver; +#start minute-trend-frame-saver; +#sync minute-trend-frame-saver; +#start raw-minute-trend-saver; + +#start fast-writer "127.255.255.255" broadcast="127.0.0.0" all; +#sleep 2; +#sleep 5; + +start producer; +#start epics dcu; +;start epics server "X3:DAQ-SHM0_" "X3:DAQ-SHM0_"; + +#start listener 8087; +start listener 8088 1; +# for this test we do not need to clear the crcs +#sleep 60; +#clear crc; diff --git a/src/daqd/tests/test_daqd_live_nds_posix_shmem.sh.in b/src/daqd/tests/test_daqd_live_nds_posix_shmem.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..b2fd9a5ee705fd55797266a32d511e8aab816a47 --- /dev/null +++ b/src/daqd/tests/test_daqd_live_nds_posix_shmem.sh.in @@ -0,0 +1,110 @@ +#!/bin/bash + +CWD="@CMAKE_CURRENT_BINARY_DIR@" + +TDIR="" +PID_MULTI_STREAM=0 +PID_DAQD=0 + +function kill_proc { + if [ $1 -gt 0 ]; then + echo "Closing process $1" + kill $1 + fi +} + +function cleanup { + rm -rf daqdrc_live_test_final + if [ "x$TDIR" != "x" ]; then + if [ -d $TDIR ]; then + rm -rf "$TDIR" + fi + fi + for name in local_dc; do + if [ -e "/dev/shm/$name" ]; then + rm -f "/dev/shm/$name" + fi + done + kill_proc $PID_MULTI_STREAM + kill_proc $PID_DAQD +} + +MUTLI_STREAM="$CWD/../fe_stream_test/fe_multi_stream_test" +if [ ! -x "$MUTLI_STREAM" ]; then + echo "cannot find $MULTI_STREAM" + exit 1 +fi + +DAQD="$CWD/../daqd/daqd" +if [ ! -x "$DAQD" ]; then + echo "cannot find $DAQD" + exit 1 +fi + +FE_STREAM_CHECK_NDS="$CWD/../fe_stream_test/fe_stream_check_nds" +if [ ! -x "$FE_STREAM_CHECK_NDS" ]; then + echo "cannot find $FE_STREAM_CHECK_NDS" + exit 1 +fi + +if [ ! -r /dev/gpstime ]; then + echo "the gpstime module must be loaded, configured, and accessible by this user" + exit 1 +fi + +PYTHON="" +which python > /dev/null +if [ $? -eq 0 ]; then + PYTHON=`which python` +else + which python3 > /dev/null + if [ $? -eq 0 ]; then + PYTHON=`which python3` + else + echo "Cannot find python or python3" + exit 1 + fi +fi + +trap cleanup EXIT + +TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"` +mkdir "$TDIR/ini_files" +mkdir "$TDIR/logs" + +echo "Ini dir = $TDIR/ini_files" + +"$MUTLI_STREAM" -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -b local_dc -m 100 -k 300 -R 247 -s shm:// > "$TDIR/logs/multi_stream" & +PID_MULTI_STREAM=$! + +echo "Streamer PID = PID_MULTI_STREAM" + +sleep 1 + +MASTER_FILE="$TDIR/ini_files/master" +TESTPOINT_FILE="" +cat daqdrc_live_posix_shmem_test | sed s\|MASTER\|$MASTER_FILE\| | sed s\|TESTPOINT\|$TESTPOINT_FILE\| > daqdrc_live_posix_shmem_test_final +"$DAQD" -c daqdrc_live_posix_shmem_test_final &> "$TDIR/logs/daqd" & +PID_DAQD=$! + +echo "Sleeping to allow the daq to start" +sleep 15 + +"$FE_STREAM_CHECK_NDS" -c 100 -t -2 +RESULT_ABS=$? + +echo "Testing return $RESULT_ABS (0 = success)" + +"$FE_STREAM_CHECK_NDS" -c 1000 +RESULT_STREAM=$? + +echo "Testing return $RESULT_STREAM (0 = success)" + +let RESULT=RESULT_ABS+RESULT_STREAM + +echo "Final RESULT=$RESULT" + +exit $RESULT +#echo "Press enter to continue..." +#DUMMY="" +#read DUMMY \ No newline at end of file diff --git a/src/drv/CMakeLists.txt b/src/drv/CMakeLists.txt index a7bac44ce97c2c4b241b03fecd818627efc5f2e4..3512dd993253ab9e428469aad90f417118c0d126 100644 --- a/src/drv/CMakeLists.txt +++ b/src/drv/CMakeLists.txt @@ -1,8 +1,10 @@ add_subdirectory(mbuf/mbuf_probe) +# add_subdirectory(python) add_library(shmem STATIC shmem.c rfm.c) target_include_directories(shmem PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) target_include_directories(shmem PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/mbuf) +target_link_libraries(shmem PUBLIC rt) add_library(driver::shmem ALIAS shmem) diff --git a/src/drv/mbuf/mbuf_probe/mbuf_probe.cc b/src/drv/mbuf/mbuf_probe/mbuf_probe.cc index 2cff9cbc1894933c8e3e11225aa200e678f76e56..649ca410d0df08d039377ab6ff9c1b24922a526b 100644 --- a/src/drv/mbuf/mbuf_probe/mbuf_probe.cc +++ b/src/drv/mbuf/mbuf_probe/mbuf_probe.cc @@ -53,7 +53,7 @@ usage( const char* progname ) std::cout << "\t-m <buffer size in MB> - The size of the buffer in megabytes\n"; std::cout << "\t-S <buffer size> - Buffer size in bytes (must be a " - "multiple of 4k)\n"; + "multiple of 1MB)\n"; std::cout << "\t-o <filename> - Output file for the copy operation " "(defaults to probe_out.bin)\n"; std::cout << "\t-i <filename> - Path to the master ini file (only used " @@ -96,10 +96,8 @@ parse_options( int argc, char* argv[] ) struct_lookup.insert( std::make_pair( "rmipcstr", MBUF_RMIPC ) ); struct_lookup.insert( std::make_pair( "daq_multi_cycle", MBUF_DAQ_MULTI_DC ) ); - struct_lookup.insert( - std::make_pair( "awg_data", MBUF_AWG_DATA) ); - struct_lookup.insert( - std::make_pair( "AWG_DATA", MBUF_AWG_DATA) ); + struct_lookup.insert( std::make_pair( "awg_data", MBUF_AWG_DATA ) ); + struct_lookup.insert( std::make_pair( "AWG_DATA", MBUF_AWG_DATA ) ); std::deque< std::string > args; for ( int i = 1; i < argc; ++i ) @@ -351,8 +349,8 @@ list_shmem_segments( ) int handle_analyze( const ConfigOpts& opts ) { - volatile void* buffer = - shmem_open_segment( opts.buffer_name.c_str( ), opts.buffer_size ); + shmem::shmem mem( opts.buffer_name.c_str( ), opts.buffer_size ); + auto* buffer = mem.mapping< void >( ); switch ( opts.analysis_type ) { case MBUF_RMIPC: @@ -362,11 +360,12 @@ handle_analyze( const ConfigOpts& opts ) analyze::analyze_multi_dc( buffer, opts.buffer_size, opts ); break; case MBUF_AWG_DATA: - analyze::analyze_awg_data(buffer, opts.buffer_size, opts); + analyze::analyze_awg_data( buffer, opts.buffer_size, opts ); break; case MBUF_INVALID: default: - std::cout << "Unknown analysis type: " << opts.analysis_type << std::endl; + std::cout << "Unknown analysis type: " << opts.analysis_type + << std::endl; return ERROR; } return OK; @@ -381,8 +380,8 @@ handle_gap_check( const ConfigOpts& opts ) "valid for\n"; return ERROR; } - volatile void* buffer = - shmem_open_segment( opts.buffer_name.c_str( ), opts.buffer_size ); + shmem::shmem mem( opts.buffer_name.c_str( ), opts.buffer_size ); + volatile void* buffer = mem.mapping< void >( ); check_gap::check_gaps( buffer, opts.buffer_size ); return OK; } @@ -390,8 +389,8 @@ handle_gap_check( const ConfigOpts& opts ) int handle_check_size( const ConfigOpts& opts ) { - volatile void* buffer = - shmem_open_segment( opts.buffer_name.c_str( ), opts.buffer_size ); + shmem::shmem mem( opts.buffer_name.c_str( ), opts.buffer_size ); + volatile void* buffer = mem.mapping< void >( ); switch ( opts.analysis_type ) { case MBUF_RMIPC: @@ -423,8 +422,9 @@ main( int argc, char* argv[] ) list_shmem_segments( ); break; case CREATE: - if ( !shmem_open_segment( opts.buffer_name.c_str( ), - opts.buffer_size ) ) + { + shmem::shmem mem( opts.buffer_name.c_str( ), opts.buffer_size ); + if ( !mem.mapping< void >( ) ) { std::cout << "Unable to create/open mbuf buffer " << opts.buffer_name << "\n"; @@ -432,10 +432,11 @@ main( int argc, char* argv[] ) } shmem_inc_segment_count( opts.buffer_name.c_str( ) ); break; + } case COPY: { - volatile void* buffer = - shmem_open_segment( opts.buffer_name.c_str( ), opts.buffer_size ); + shmem::shmem mem( opts.buffer_name.c_str( ), opts.buffer_size ); + volatile void* buffer = mem.mapping< void >( ); if ( !buffer ) { std::cout << "Unable to create/open mbuf buffer " diff --git a/src/drv/mbuf/mbuf_probe/mbuf_probe.hh b/src/drv/mbuf/mbuf_probe/mbuf_probe.hh index 58b9cb04933dbc2d61465a366f9bb39e037a8f33..0c572e861b96588d559a2c79d21418e45f210d3d 100644 --- a/src/drv/mbuf/mbuf_probe/mbuf_probe.hh +++ b/src/drv/mbuf/mbuf_probe/mbuf_probe.hh @@ -83,6 +83,11 @@ struct ConfigOpts { return; } + if ( buffer_size % ( 1024 * 1024 ) != 0 ) + { + set_error( "The buffer size must be a multiple of 1 MB" ); + return; + } switch ( action ) { case CREATE: diff --git a/src/drv/shmem.c b/src/drv/shmem.c index e91e9e045b9a64e72938a908cfb9d04fb29552f2..555d55ecdc91e4d2ee8613367b176e7cf425fe98 100644 --- a/src/drv/shmem.c +++ b/src/drv/shmem.c @@ -1,4 +1,5 @@ #include <stdio.h> +#include <stdlib.h> #include <string.h> #include <ctype.h> @@ -17,59 +18,317 @@ extern "C" { #endif -int shmem_format_name(char *dest, const char *src, size_t n) +#define SHMEM_TYPE_MBUF 0 +#define SHMEM_TYPE_POSIX 1 +#define SHMEM_TYPE_WRAPPED -1 + +typedef struct shmem_mbuf_intl +{ + int fd_; +} shmem_mbuf_intl; + +typedef struct shmem_posix_intl +{ + int fd_; + char name_[ MBUF_NAME_LEN + 2 ]; +} shmem_posix_intl; + +typedef struct shmem_wrapped_intl +{ + void ( *close )( volatile void* ); +} shmem_wrapped_intl; + +typedef struct shmem_intl +{ + int shmem_type; + volatile void* mapping_; + size_t req_size_; + + void ( *close )( struct shmem_intl* ); + + union + { + shmem_mbuf_intl mbuf; + shmem_posix_intl posix; + shmem_wrapped_intl wrapped; + } intl; +} shmem_intl; + +int +shmem_format_name( char* dest, const char* src, size_t n ) { - char *cur = NULL; + char* cur = NULL; - static const char *prefix = "/rtl_mem_"; - size_t dest_len = 0; - int tmp = 0; + static const char* prefix = "/rtl_mem_"; + size_t dest_len = 0; + int tmp = 0; - if (!dest || !src || n < 1) + if ( !dest || !src || n < 1 ) return 0; - if (snprintf(dest, n, "%s%s", prefix, src) > n) + if ( snprintf( dest, n, "%s%s", prefix, src ) > n ) return 0; - for (cur = dest; *cur; ++cur) { - *cur = (char)tolower(*cur); + for ( cur = dest; *cur; ++cur ) + { + *cur = (char)tolower( *cur ); } return 1; } -volatile void* shmem_open_segment(const char *sys_name, size_t req_size) +extern const char* +shmem_name_parse( const char* input, const char** prefix ) +{ + const char* dummy = NULL; + if ( !input ) + { + return NULL; + } + if ( !prefix ) + { + prefix = &dummy; + } + if ( strncmp( input, SHMEM_MBUF_PREFIX, strlen( SHMEM_MBUF_PREFIX ) ) == 0 ) + { + *prefix = SHMEM_MBUF_PREFIX; + return input + strlen( SHMEM_MBUF_PREFIX ); + } + else if ( strncmp( input, + SHMEM_POSIX_PREFIX, + strlen( SHMEM_POSIX_PREFIX ) ) == 0 ) + { + *prefix = SHMEM_POSIX_PREFIX; + return input + strlen( SHMEM_POSIX_PREFIX ); + } + *prefix = SHMEM_MBUF_PREFIX; + return input; +} + +static void +shmem_close_mbuf( shmem_intl* intl ) +{ + if ( intl ) + { + if ( intl->mapping_ != MAP_FAILED ) + { + munmap( (void*)( intl->mapping_ ), intl->req_size_ ); + } + if ( intl->intl.mbuf.fd_ ) + { + close( intl->intl.mbuf.fd_ ); + } + } +} + +static shmem_handle +shmem_open_mbuf( const char* sys_name, size_t size_mb ) { - int fd = 0; - int buffer_len = 0; - size_t name_len = 0; - volatile void *addr = NULL; + shmem_intl* intl = NULL; + int buffer_len = 0; + size_t name_len = 0; + size_t req_size = size_mb * 1024 * 1024; - if (!sys_name || req_size == 0) + name_len = strlen( sys_name ); + if ( name_len == 0 || name_len > MBUF_NAME_LEN ) + { return NULL; - name_len = strlen(sys_name); - if (name_len == 0 || name_len > MBUF_NAME_LEN) { + } + + intl = malloc( sizeof( shmem_intl ) ); + if ( !intl ) + { return NULL; } + memset( intl, 0, sizeof( *intl ) ); + intl->shmem_type = SHMEM_TYPE_MBUF; + intl->mapping_ = MAP_FAILED; + intl->close = shmem_close_mbuf; - if ((fd = open ("/dev/mbuf", O_RDWR | O_SYNC)) < 0) { - fprintf(stderr, "Couldn't open /dev/mbuf read/write\n"); - return 0; + if ( ( intl->intl.mbuf.fd_ = open( "/dev/mbuf", O_RDWR | O_SYNC ) ) < 0 ) + { + fprintf( stderr, "Couldn't open /dev/mbuf read/write\n" ); + goto cleanup; } struct mbuf_request_struct req; req.size = req_size; - strcpy(req.name, sys_name); - buffer_len = ioctl (fd, IOCTL_MBUF_ALLOCATE, &req); + strcpy( req.name, sys_name ); + buffer_len = ioctl( intl->intl.mbuf.fd_, IOCTL_MBUF_ALLOCATE, &req ); + + if ( buffer_len < (int)req_size ) + { + goto cleanup; + } + intl->req_size_ = req_size; + intl->mapping_ = (volatile void*)mmap( 0, + req_size, + PROT_READ | PROT_WRITE, + MAP_SHARED, + intl->intl.mbuf.fd_, + 0 ); + if ( intl->mapping_ == MAP_FAILED ) + { + goto cleanup; + } + /* printf(" %s mmapped address is 0x%lx\n", sys,(long)addr); */ + return (void*)intl; + +cleanup: + shmem_close( intl ); + return NULL; +} + +static void +shmem_close_posix( shmem_intl* intl ) +{ + if ( intl ) + { + if ( intl->mapping_ != MAP_FAILED ) + { + munmap( (void*)intl->mapping_, intl->req_size_ ); + } + if ( intl->intl.posix.fd_ >= 0 ) + { + close( intl->intl.posix.fd_ ); + } + if ( strcmp( intl->intl.posix.name_, "" ) != 0 ) + { + // shm_unlink( intl->intl.posix.name_ ); + } + } +} - if (buffer_len < (int)req_size) { - close(fd); +static shmem_handle +shmem_open_posix( const char* sys_name, size_t size_mb ) +{ + shmem_intl* intl = NULL; + size_t name_len = 0; + size_t req_size = size_mb * 1024 * 1024; + int status = 0; + struct stat statbuf; + + name_len = strlen( sys_name ); + if ( name_len == 0 || name_len > MBUF_NAME_LEN ) + { return NULL; } - addr = (volatile void *)mmap(0, req_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (addr == MAP_FAILED) { - close(fd); + intl = malloc( sizeof( shmem_intl ) ); + if ( !intl ) + { return NULL; } - /* printf(" %s mmapped address is 0x%lx\n", sys,(long)addr); */ - return addr; + memset( intl, 0, sizeof( *intl ) ); + intl->shmem_type = SHMEM_TYPE_POSIX; + intl->mapping_ = MAP_FAILED; + intl->close = shmem_close_posix; + + snprintf( intl->intl.posix.name_, MBUF_NAME_LEN + 2, "/%s", sys_name ); + + intl->intl.posix.fd_ = + shm_open( intl->intl.posix.name_, O_RDWR | O_CREAT, 0666 ); + if ( intl->intl.posix.fd_ < 0 ) + { + goto cleanup; + } + status = fstat( intl->intl.posix.fd_, &statbuf ); + if ( status ) + { + goto cleanup; + } + if ( statbuf.st_size == 0 ) + { + status = ftruncate( intl->intl.posix.fd_, req_size ); + if ( status ) + { + goto cleanup; + } + } + else if ( statbuf.st_size < req_size ) + { + goto cleanup; + } + intl->req_size_ = req_size; + + intl->mapping_ = (volatile void*)mmap( 0, + req_size, + PROT_READ | PROT_WRITE, + MAP_SHARED, + intl->intl.mbuf.fd_, + 0 ); + if ( intl->mapping_ == MAP_FAILED ) + { + goto cleanup; + } + + return intl; +cleanup: + shmem_close( intl ); + return NULL; +} + +static void +shmem_close_wrapped( shmem_intl* intl ) +{ + if ( intl && intl->intl.wrapped.close ) + { + intl->intl.wrapped.close( intl->mapping_ ); + } +} + +shmem_handle +shmem_open( const char* sys_name, size_t size_mb ) +{ + if ( !sys_name || size_mb == 0 ) + return NULL; + + if ( strncmp( sys_name, SHMEM_MBUF_PREFIX, strlen( SHMEM_MBUF_PREFIX ) ) == + 0 ) + { + return shmem_open_mbuf( sys_name + strlen( SHMEM_MBUF_PREFIX ), + size_mb ); + } + else if ( strncmp( sys_name, + SHMEM_POSIX_PREFIX, + strlen( SHMEM_POSIX_PREFIX ) ) == 0 ) + { + return shmem_open_posix( sys_name + strlen( SHMEM_POSIX_PREFIX ), + size_mb ); + } + return shmem_open_mbuf( sys_name, size_mb ); +} + +void +shmem_close( shmem_handle handle ) +{ + shmem_intl* intl = (shmem_intl*)handle; + if ( intl ) + { + intl->close( intl ); + free( intl ); + } +} + +volatile void* +shmem_mapping( shmem_handle handle ) +{ + shmem_intl* intl = (shmem_intl*)handle; + return ( intl ? intl->mapping_ : NULL ); +} + +shmem_handle +shmem_wrap_memory_arbitrary( volatile void* data, + size_t size, + void ( *cleanup )( volatile void* ) ) +{ + shmem_intl* intl = malloc( sizeof( shmem_intl ) ); + if ( !intl ) + { + return NULL; + } + memset( intl, 0, sizeof( *intl ) ); + intl->mapping_ = data; + intl->req_size_ = size; + intl->close = shmem_close_wrapped; + intl->intl.wrapped.close = cleanup; + return intl; } #ifdef __cplusplus diff --git a/src/fe_stream_test/CMakeLists.txt b/src/fe_stream_test/CMakeLists.txt index afe8d9cef8517302638cf2509788e6c4657a1ed0..6693e86989431e7e7266427d59cc30a9ab624dcb 100644 --- a/src/fe_stream_test/CMakeLists.txt +++ b/src/fe_stream_test/CMakeLists.txt @@ -17,7 +17,7 @@ target_link_libraries(fe_multi_stream_test PRIVATE fe_stream_generator fe_generator_support - rt) + driver::shmem) add_executable(fe_stream_check fe_stream_check.cc @@ -43,14 +43,16 @@ add_executable(fe_stream_check_nds fe_stream_check_nds.cc) target_link_libraries(fe_stream_check_nds PRIVATE fe_stream_generator - nds2client::cxx) + nds2client::cxx + driver::shmem) add_executable(fe_stream_check_edcu_nds fe_stream_check_edcu_nds.cc) target_include_directories(fe_stream_check_edcu_nds PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) target_link_libraries(fe_stream_check_edcu_nds PRIVATE fe_stream_generator - nds2client::cxx) + nds2client::cxx + driver::shmem) install(TARGETS fe_stream_check_nds DESTINATION bin) diff --git a/src/fe_stream_test/fe_check.cc b/src/fe_stream_test/fe_check.cc index 18d0d6ddd50ecb0ef18368b3dd2b23c47bf4c06e..cd390ac3cd78a39ae9486614536817210bf6bd29 100644 --- a/src/fe_stream_test/fe_check.cc +++ b/src/fe_stream_test/fe_check.cc @@ -5,6 +5,7 @@ #include <deque> #include <iostream> #include <iterator> +#include <memory> #include <string> #include <vector> @@ -56,14 +57,15 @@ struct model { return 0; } - data = (volatile char*)shmem_open_segment( mbuf_name.c_str( ), - 64 * 1024 * 1024 ); + shmem_obj = std::make_shared< shmem::shmem >( mbuf_name.c_str( ), 64 ); + data = shmem_obj->mapping< char >( ); return data; } - std::string name; - std::string mbuf_name; - volatile char* data; + std::string name; + std::string mbuf_name; + std::shared_ptr< shmem::shmem > shmem_obj; + volatile char* data; }; struct config_t diff --git a/src/fe_stream_test/fe_multi_stream_test.cc b/src/fe_stream_test/fe_multi_stream_test.cc index 3b79849b66fd5e16d488f7df4b174f095230303e..e8d5d9d2772313563b04c1d7d3dd0cfddbc8e540 100644 --- a/src/fe_stream_test/fe_multi_stream_test.cc +++ b/src/fe_stream_test/fe_multi_stream_test.cc @@ -19,10 +19,7 @@ #include "gps.hh" -extern "C" { - -#include "../drv/rfm.c" -} +#include "drv/shmem.h" enum MODEL_STATUS { @@ -73,14 +70,15 @@ struct Options { Options( ) : models( ), ini_root( ), master_path( "master" ), - mbuf_name( "local_dc" ), mbuf_size_mb( 100 ), concentrate( true ), - show_help( false ) + mbuf_name( "local_dc" ), shmem_prefix( "mbuf://" ), + mbuf_size_mb( 100 ), concentrate( true ), show_help( false ) { } std::vector< ModelParams > models; std::string ini_root; std::string master_path; std::string mbuf_name; + std::string shmem_prefix; size_t mbuf_size_mb; bool concentrate; bool show_help; @@ -341,16 +339,17 @@ public: } void - open_rmpic( ) + open_rmpic( std::vector< shmem::shmem >& shmem_objs, + const std::string& shmem_prefix ) { if ( rmipc_ != 0 || status_ == MODEL_STATUS::STOPPED ) { return; } - std::string mbuf_name = name_ + "_daq"; - rmipc_ = reinterpret_cast< volatile char* >( findSharedMemorySize( - const_cast< char* >( mbuf_name.c_str( ) ), 64 ) ); - if ( rmipc_ == 0 ) + std::string mbuf_name = shmem_prefix + name_ + "_daq"; + shmem_objs.emplace_back( mbuf_name.c_str( ), 64 ); + rmipc_ = shmem_objs.back( ).mapping< char >( ); + if ( rmipc_ == nullptr ) { std::cerr << "Unable to open shmem buffer\n"; exit( 1 ); @@ -644,7 +643,7 @@ generate_models_rmpic( std::vector< ModelPtr >& models, volatile char* data = shmem + CDS_DAQ_NET_DATA_OFFSET; // The block size is the maximal block size * 2 (to allow for TP data) - volatile char* data_cur = data + cycle * ( DAQ_DCU_BLOCK_SIZE); + volatile char* data_cur = data + cycle * ( DAQ_DCU_BLOCK_SIZE ); volatile char* data_end = data_cur + ( DAQ_DCU_BLOCK_SIZE ); for ( int i = 0; i < cur_model.generators( ).size( ); ++i ) @@ -795,6 +794,8 @@ usage( const char* progname ) std::cout << "\t-M path - Path to the master file (will be overwritten) " "[master]\n"; std::cout << "\t-b name - Name of the output mbuf [local_dc]\n"; + std::cout << "\t-s shmem type - Shared memory type, mbuf:// or shm:// " + "(mbuf://)\n"; std::cout << "\t-m size_mb - Size in MB of the output mbuf [100]\n"; std::cout << "\t-k size_kb - Default data rate of each model in kB\n"; std::cout << "\t-R num - number of models to simulate [1-247]\n"; @@ -864,7 +865,7 @@ parse_arguments( int argc, char* argv[] ) int c; int model_data_size = 700 * 1024; - while ( ( c = getopt( argc, argv, "i:M:b:m:D:R:k:h:St:f:" ) ) != -1 ) + while ( ( c = getopt( argc, argv, "i:M:b:m:D:R:k:h:St:f:s:" ) ) != -1 ) { switch ( c ) { @@ -921,6 +922,9 @@ parse_arguments( int argc, char* argv[] ) case 'S': opts.concentrate = false; break; + case 's': + opts.shmem_prefix = optarg; + break; case 't': if ( opts.models.empty( ) ) { @@ -984,16 +988,6 @@ sum_channels( const size_t current, const ModelPtr& m ) return current + m->chan_count( ); } -/** - * @brief helper function to open up all the mbufs for models - * @param m model to open the mbuf for - */ -void -open_rmipc( const ModelPtr& m ) -{ - m->open_rmpic( ); -} - int main( int argc, char* argv[] ) { @@ -1044,16 +1038,17 @@ main( int argc, char* argv[] ) volatile daq_multi_cycle_header_t* ifo_header = 0; volatile char* ifo_data = 0; size_t data_size = 0; + std::vector< shmem::shmem > shmem_objs; if ( concentrate ) { - std::string shmem_sysname = opts.mbuf_name; + std::string shmem_sysname = opts.shmem_prefix + opts.mbuf_name; size_t buffer_size_mb = opts.mbuf_size_mb; size_t buffer_size = buffer_size_mb * 1024 * 1024; - shmem = reinterpret_cast< volatile char* >( findSharedMemorySize( - const_cast< char* >( shmem_sysname.c_str( ) ), buffer_size_mb ) ); - if ( shmem == 0 ) + shmem_objs.emplace_back( shmem_sysname.c_str( ), buffer_size_mb ); + shmem = shmem_objs.back( ).mapping< char >( ); + if ( shmem == nullptr ) { std::cerr << "Unable to open shmem buffer\n"; exit( 1 ); @@ -1069,7 +1064,11 @@ main( int argc, char* argv[] ) } else { - std::for_each( models.begin( ), models.end( ), open_rmipc ); + std::for_each( models.begin( ), + models.end( ), + [&opts, &shmem_objs]( const ModelPtr& m ) { + m->open_rmpic( shmem_objs, opts.shmem_prefix ); + } ); } GPS::gps_clock clock( 0 ); diff --git a/src/fe_stream_test/fe_stream_check.cc b/src/fe_stream_test/fe_stream_check.cc index aad5c5245ed02b53c0d178ca44c46aef0863cc3c..4a5ff749c8fc88af765423f811ded28090ec6a95 100644 --- a/src/fe_stream_test/fe_stream_check.cc +++ b/src/fe_stream_test/fe_stream_check.cc @@ -453,8 +453,9 @@ load_buffer( config_t& opts ) { int tries = 5; - volatile void* data = - shmem_open_segment( opts.mbuf_name.c_str( ), opts.mbuf_size ); + shmem::shmem shmem_obj( opts.mbuf_name.c_str( ), + opts.mbuf_size / ( 1024 * 1024 ) ); + volatile void* data = shmem_obj.mapping< void >( ); buffer.resize( opts.mbuf_size ); daq_multi_cycle_header_t* header = diff --git a/src/gds/awgtpman/CMakeLists.txt b/src/gds/awgtpman/CMakeLists.txt index 7f257922996973a21f72c4d5651ba8304569d890..112a9e48b1c554f32a3cc3776362f6b0ca834043 100644 --- a/src/gds/awgtpman/CMakeLists.txt +++ b/src/gds/awgtpman/CMakeLists.txt @@ -28,7 +28,7 @@ add_executable(awgtpman shared_memory.c ../../util/modelrate.c - ../../drv/shmem.c + #../../drv/shmem.c big_buffers.c awgtpman.h) target_include_directories(awgtpman PRIVATE @@ -45,6 +45,7 @@ target_link_libraries(awgtpman PRIVATE #testpoint_objs rawgapi_rpc rtestpoint_rpc + driver::shmem ) target_compile_definitions(awgtpman PUBLIC LIGO_GDS _ADVANCED_LIGO _AWG_RM MAX_CHNNAME_SIZE=60) diff --git a/src/gds/awgtpman/shared_memory.c b/src/gds/awgtpman/shared_memory.c index 6a9dc9fec499068eea1165cbc594a872b655a26d..9526dbe8c40b0c2b4c73a04b52813d9831f020b4 100644 --- a/src/gds/awgtpman/shared_memory.c +++ b/src/gds/awgtpman/shared_memory.c @@ -10,37 +10,45 @@ #include "shmem_all.h" #include "drv/shmem.h" -//AWG data shared memory +// AWG data shared memory -volatile AWG_DATA *shmemAwgData = NULL; +volatile AWG_DATA* shmemAwgData = NULL; -int OpenAwgDataSharedMemory(const char *model_name) +int +OpenAwgDataSharedMemory( const char* model_name ) { - char buff[256]; - strncpy(buff, model_name, sizeof(buff)); - buff[sizeof(buff)-1] = 0; - strncat(buff, SHMEM_AWG_SUFFIX, sizeof(buff)-1); - buff[sizeof(buff)-1] = 0; - shmemAwgData = (volatile AWG_DATA *)shmem_open_segment(buff, SHMEM_AWG_SIZE); - AWG_DATA_INIT(shmemAwgData); - return (shmemAwgData!=NULL) ? -1 : 0; + char buff[ 256 ]; + shmem_handle handle = NULL; + + strncpy( buff, model_name, sizeof( buff ) ); + buff[ sizeof( buff ) - 1 ] = 0; + strncat( buff, SHMEM_AWG_SUFFIX, sizeof( buff ) - 1 ); + buff[ sizeof( buff ) - 1 ] = 0; + handle = shmem_open( buff, SHMEM_AWG_SIZE_MB ); + shmemAwgData = shmem_mapping( handle ); + AWG_DATA_INIT( shmemAwgData ); + return ( shmemAwgData != NULL ) ? -1 : 0; } +// IO from IOP shared memory +volatile IO_MEM_DATA* ioMemData; -//IO from IOP shared memory -volatile IO_MEM_DATA *ioMemData; +// Pointers into the shared memory for the cycle and time (coming from the IOP +// (e.g. x00)) these are just convenient pointers into some part of ioMemData. +volatile unsigned int* ioMemDataCycle; +volatile unsigned int* ioMemDataGPS; +volatile unsigned int* ioMemDataRate_sps; -// Pointers into the shared memory for the cycle and time (coming from the IOP (e.g. x00)) -// these are just convenient pointers into some part of ioMemData. -volatile unsigned int *ioMemDataCycle; -volatile unsigned int *ioMemDataGPS; -volatile unsigned int *ioMemDataRate_sps; - -int OpenIoMemSharedMemory() +int +OpenIoMemSharedMemory( ) { - volatile void *iomem_base = (volatile AWG_DATA *)shmem_open_segment(SHMEM_IOMEM_NAME, SHMEM_IOMEM_SIZE); + volatile void* iomem_base = NULL; + shmem_handle handle = NULL; + + handle = shmem_open( SHMEM_IOMEM_NAME, SHMEM_IOMEM_SIZE_MB ); + iomem_base = shmem_mapping( handle ); - ioMemData = (volatile IO_MEM_DATA *)(iomem_base + IO_MEM_DATA_OFFSET); + ioMemData = (volatile IO_MEM_DATA*)( iomem_base + IO_MEM_DATA_OFFSET ); - return (ioMemData!=NULL) ? -1 : 0; + return ( ioMemData != NULL ) ? -1 : 0; } \ No newline at end of file diff --git a/src/include/drv/shmem.h b/src/include/drv/shmem.h index 0576a17817c3e3eb65fa4f429211a7bfee79fc1a..baa37edafb8f674c36b0bec68b325db61803fba3 100644 --- a/src/include/drv/shmem.h +++ b/src/include/drv/shmem.h @@ -2,17 +2,148 @@ #define LIGO_DRIVER_SHMEM_H #ifdef __cplusplus + +#include <string> + extern "C" { #endif -extern volatile void *findSharedMemory(char *sys_name); -extern volatile void *findSharedMemorySize(char *sys_name, int size); +extern volatile void* findSharedMemory( char* sys_name ); +extern volatile void* findSharedMemorySize( char* sys_name, int size ); + +extern int shmem_format_name( char* dest, const char* src, size_t n ); +// extern volatile void* shmem_open_segment(const char *sys_name, size_t +// req_size); + +#define SHMEM_MBUF_PREFIX "mbuf://" +#define SHMEM_POSIX_PREFIX "shm://" -extern int shmem_format_name(char *dest, const char *src, size_t n); -extern volatile void* shmem_open_segment(const char *sys_name, size_t req_size); +typedef void* shmem_handle; + +/** + * @brief given a name mbuf://name, shm://name, name, ... return the start of + * name + * @param input input name + * @param prefix (optional) gets a pointer to a copy of the prefix type + * @return The portion after any prefix value, or name if no prefix found. + */ +extern const char* shmem_name_parse( const char* input, const char** prefix ); + +/** + * @brief Open a shared memory segment + * @param name name of the segment + * @param size_mb size in megabytes 2**20 + * @return a handle != 0 on success + */ +extern shmem_handle shmem_open( const char* name, size_t size_mb ); +/** + * @brief wrap an existing block of memory, typically for part of + * testing/debugging + * @param data the data to wrap + * @param size_mb size of *data in megabytes 2**20 + * @param cleanup callback to cleanup/deallocate data when shmem_close called + * @return a handle != 0 + */ +extern shmem_handle shmem_wrap( volatile void* data, + size_t size_mb, + void ( *cleanup )( volatile void* ) ); +/** + * @brief Close a shared memory handle + * @param handle the handle + * @note safe to call with a null handle + */ +extern void shmem_close( shmem_handle handle ); +/** + * @brief Get the address of the memory mapping + * @param handle the handle + * @return null pointer or the mapped address + * @note safe to call with a null handle + */ +extern volatile void* shmem_mapping( shmem_handle handle ); + +extern shmem_handle shmem_wrap_memory_arbitrary( + volatile void* data, size_t size, void ( *cleanup )( volatile void* ) ); #ifdef __cplusplus } + +namespace shmem +{ + /** + * @brief a simple RAII wrapper around the shmem_... functions + */ + class shmem + { + public: + shmem( ) : handle_{ nullptr } + { + } + shmem( const char* name, size_t size_mb ) + : handle_{ shmem_open( name, size_mb ) } + { + } + shmem( volatile void* data, + size_t size_in_bytes, + void ( *cleanup )( volatile void* ) ) + : handle_{ shmem_wrap_memory_arbitrary( + data, size_in_bytes, cleanup ) } + { + } + shmem( shmem&& other ) noexcept : handle_{ other.handle_ } + { + other.handle_ = nullptr; + } + shmem( const shmem& ) = delete; + + shmem& + operator=( shmem&& other ) noexcept + { + if ( &other != this ) + { + shmem_close( handle_ ); + handle_ = other.handle_; + other.handle_ = nullptr; + } + return *this; + } + shmem& operator=( const shmem& ) = delete; + + ~shmem( ) + { + shmem_close( handle_ ); + } + + template < typename T > + volatile T* + mapping( ) + { + return reinterpret_cast< volatile T* >( shmem_mapping( handle_ ) ); + } + + private: + shmem_handle handle_; + }; + + inline void + parse_name( const std::string& input, + std::string& prefix, + std::string& name ) + { + if ( input.empty( ) ) + { + prefix = ""; + name = ""; + return; + } + const char* prefix_str = nullptr; + const char* parsed_name = + shmem_name_parse( input.c_str( ), &prefix_str ); + + name = parsed_name; + prefix = prefix_str; + } +} // namespace shmem + #endif #endif /* LIGO_DRIVER_SHMEM_H */ \ No newline at end of file diff --git a/src/ix_stream/dix_recv.c b/src/ix_stream/dix_recv.c index afac49367523c00178bff6cacc97b70eb96bd770..52fa7fed5e902eb672ceadf373338b232c546387 100644 --- a/src/ix_stream/dix_recv.c +++ b/src/ix_stream/dix_recv.c @@ -15,6 +15,7 @@ #include "../include/daq_core.h" #include "args.h" +#include "drv/shmem.h" #define __CDECL @@ -24,7 +25,6 @@ #define MY_DAT_OFFSET ( MY_DCU_OFFSET + 0xa000 ) #include "./dolphin_common.c" -extern void* findSharedMemorySize( char*, int ); static volatile int keepRunning = 1; @@ -138,8 +138,8 @@ int __CDECL max_data_size = max_data_size_mb * 1024 * 1024; // Attach to local shared memory - char* ifo = - (char*)findSharedMemorySize( (char*)dest_mbuf_name, max_data_size_mb ); + shmem_handle shmem_obj = shmem_open( dest_mbuf_name, max_data_size_mb ); + char* ifo = (char*)shmem_mapping( shmem_obj ); daq_multi_cycle_data_t* ifo_shm = (daq_multi_cycle_data_t*)ifo; // char *ifo_data = (char *)ifo + sizeof(daq_multi_cycle_header_t); char* ifo_data = (char*)&( ifo_shm->dataBlock[ 0 ] ); @@ -241,6 +241,7 @@ int __CDECL print_diags2( nsys, new_cycle, sendLength, ixDataBlock ); } while ( keepRunning ); + shmem_close( shmem_obj ); // we never exit except for timeout or being killed return 1; } diff --git a/src/ix_stream/dix_xmit.c b/src/ix_stream/dix_xmit.c index e75d33232499d7fb58fe0fd33eac8adfd64c5e05..97f66a4b0aa226777ca7ea3b1611dd18bddd8459 100644 --- a/src/ix_stream/dix_xmit.c +++ b/src/ix_stream/dix_xmit.c @@ -33,6 +33,7 @@ #include "simple_pv.h" #include "args.h" +#include "drv/shmem.h" #define __CDECL @@ -41,8 +42,6 @@ static int xmitDataOffset[ IX_BLOCK_COUNT ]; daq_multi_cycle_header_t* xmitHeader[ IX_BLOCK_COUNT ]; -extern void* findSharedMemorySize( char*, int ); - int do_verbose = 0; static volatile int keepRunning = 1; @@ -163,6 +162,7 @@ main( int argc, char** argv ) // Declare shared memory data variables daq_multi_cycle_header_t* ifo_header; + shmem_handle shmem_obj = NULL; char* ifo; char* ifo_data; int cycle_data_size; @@ -246,7 +246,8 @@ main( int argc, char** argv ) signal( SIGPIPE, sigpipeHandler ); // Get pointers to local DAQ mbuf - ifo = (char*)findSharedMemorySize( buffer_name, max_data_size_mb ); + shmem_obj = shmem_open( buffer_name, max_data_size_mb ); + ifo = (char*)shmem_mapping( shmem_obj ); ifo_header = (daq_multi_cycle_header_t*)ifo; ifo_data = (char*)ifo + sizeof( daq_multi_cycle_header_t ); @@ -628,6 +629,11 @@ main( int argc, char** argv ) simple_pv_server_destroy( &pcas_server ); + if ( shmem_obj ) + { + shmem_close( shmem_obj ); + } + // we never exit except for timeout or being killed exit( 1 ); } diff --git a/src/local_dc/CMakeLists.txt b/src/local_dc/CMakeLists.txt index 020ce2666b7dfcfc69bc00a7c0b58335489e22cf..d70be10d2db0d78a331ed14ef43288d8c466e251 100644 --- a/src/local_dc/CMakeLists.txt +++ b/src/local_dc/CMakeLists.txt @@ -3,14 +3,17 @@ add_executable(local_dc local_dc.c ${CMAKE_CURRENT_SOURCE_DIR}/../drv/rfm.c) target_link_libraries(local_dc PUBLIC args util - driver::gpsclock) + driver::gpsclock + driver::shmem) configure_file(test_local_dc.sh.in test_local_dc.sh @ONLY) +configure_file(test_local_dc_posix_shmem.sh.in test_local_dc_posix_shmem.sh @ONLY) configure_file(test_local_dc_stopped_model.sh.in test_local_dc_stopped_model.sh @ONLY) configure_file(test_local_dc_with_systab.sh.in test_local_dc_with_systab.sh @ONLY) add_executable(test_local_dc_unit_tests tests/test_local_dc_unit_tests.c) target_include_directories(test_local_dc_unit_tests PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(test_local_dc_unit_tests PUBLIC driver::shmem) add_executable(check_for_dcu_existence tests/check_for_dcu_existence.cc) target_include_directories(check_for_dcu_existence PUBLIC @@ -24,6 +27,10 @@ add_test(NAME "test_local_dc" COMMAND /bin/bash ./test_local_dc.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") +add_test(NAME "test_local_dc_posix_shmem" + COMMAND /bin/bash ./test_local_dc_posix_shmem.sh + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_local_dc_with_stopped_models" COMMAND /bin/bash ./test_local_dc_stopped_model.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") diff --git a/src/local_dc/local_dc.c b/src/local_dc/local_dc.c index dc89ee11cd9e9c47b37b6146f7ae346df1e037f6..2a443aeaecf91d0449337ab840053ff0a900f67e 100644 --- a/src/local_dc/local_dc.c +++ b/src/local_dc/local_dc.c @@ -27,6 +27,7 @@ #include "modelrate.h" #include "local_dc_utils.h" #include "drv/gpsclock.h" +#include "drv/shmem.h" #define MSG_BUF_SIZE sizeof( daq_dc_data_t ) @@ -34,6 +35,7 @@ #define MAX_NSYS 10 +static shmem_handle shmHandles[ 128 ]; static struct rmIpcStr* shmIpcPtr[ 128 ]; static char* shmDataPtr[ 128 ]; static struct cdsDaqNetGdsTpNum* shmTpTable[ 128 ]; @@ -45,9 +47,7 @@ daq_multi_dcu_data_t* ixDataBlock; daq_multi_cycle_header_t* ifo_header; char* zbuffer; -extern void* findSharedMemory( char* ); -extern void* findSharedMemorySize( char*, int ); - +const char* shmem_prefix[ DAQ_TRANSIT_MAX_DCU ]; char modelnames[ DAQ_TRANSIT_MAX_DCU ][ 64 ]; int do_verbose = 0; static volatile int keepRunning = 1; @@ -61,16 +61,17 @@ int daqStatBit[ 2 ]; // ********************************************************************************************** /// Get current GPS time from the symmetricom IRIG-B card unsigned long -symm_gps_time( unsigned long* frac, int* stt ) { - unsigned long t[3]; +symm_gps_time( unsigned long* frac, int* stt ) +{ + unsigned long t[ 3 ]; - if (symmetricom_fd >= 0) + if ( symmetricom_fd >= 0 ) { - ioctl(symmetricom_fd, IOCTL_SYMMETRICOM_TIME, &t); + ioctl( symmetricom_fd, IOCTL_SYMMETRICOM_TIME, &t ); } else { - gpsclock_time(t); + gpsclock_time( t ); } t[ 1 ] *= 1000; t[ 1 ] += t[ 2 ]; @@ -87,11 +88,12 @@ int symm_ok( ) { unsigned long req = 1; - if (symmetricom_fd >= 0) + if ( symmetricom_fd >= 0 ) { req = 0; - ioctl(symmetricom_fd, IOCTL_SYMMETRICOM_STATUS, &req); - fprintf(stderr, "Symmetricom status: %s\n", req ? "LOCKED" : "UNCLOCKED"); + ioctl( symmetricom_fd, IOCTL_SYMMETRICOM_STATUS, &req ); + fprintf( + stderr, "Symmetricom status: %s\n", req ? "LOCKED" : "UNCLOCKED" ); } return req; } @@ -99,11 +101,12 @@ symm_ok( ) // ******************************************************************************* // Wait for data ready from FE models // ******************************************************************************* -int waitNextCycle2( int nsys, - unsigned int cyclereq, // Cycle to wait for - int reset, // Request to reset model ipc shared memory - int dataRdy[], - struct rmIpcStr* ipcPtr[] ) // Pointer to IOP IPC shared memory +int +waitNextCycle2( int nsys, + unsigned int cyclereq, // Cycle to wait for + int reset, // Request to reset model ipc shared memory + int dataRdy[], + struct rmIpcStr* ipcPtr[] ) // Pointer to IOP IPC shared memory { int iopRunning = 0; int ii; @@ -331,9 +334,9 @@ send_to_local_memory( int nsys, int do_wait ) int lastCycle = 0; unsigned int nextCycle = 0; - int sync2iop = 1; - int status = 0; - int dataRdy[ MAX_NSYS ]; + int sync2iop = 1; + int status = 0; + int dataRdy[ MAX_NSYS ]; for ( ii = 0; ii < MAX_NSYS; ii++ ) { @@ -351,8 +354,8 @@ send_to_local_memory( int nsys, int do_wait ) for ( ii = 0; ii < nsys; ii++ ) dataRdy[ ii ] = 0; - status = waitNextCycle2( - nsys, nextCycle, sync2iop, dataRdy, shmIpcPtr ); + status = + waitNextCycle2( nsys, nextCycle, sync2iop, dataRdy, shmIpcPtr ); // status = waitNextCycle(nextCycle,sync2iop,shmIpcPtr[0]); if ( !status ) { @@ -404,6 +407,7 @@ int __CDECL main( int argc, char* argv[] ) { char systab_buffer[ 1024 ]; + char modelname_buffer[ 64 ]; int counter = 0; int nsys = 0; int ii = 0; @@ -421,15 +425,16 @@ int __CDECL char* buffer_name = NULL; const char* buffer_name_ = NULL; - const char* sysname_; - char* sysname; - const char* logfilename = NULL; - int len; - int iter; - int do_wait = 1; - int do_bothways; - const char* systab_fname = NULL; - args_handle arg_parser = NULL; + const char* sysname_; + char* sysname; + const char* logfilename = NULL; + int len; + int iter; + int do_wait = 1; + int do_bothways; + shmem_handle ifo_handle = NULL; + const char* systab_fname = NULL; + args_handle arg_parser = NULL; sysname = NULL; bzero( modelrates, sizeof( modelrates[ 0 ] ) * DAQ_TRANSIT_MAX_DCU ); @@ -600,6 +605,16 @@ int __CDECL for ( ii = 0; ii < nsys; ++ii ) { + shmHandles[ ii ] = NULL; + strcpy( modelname_buffer, modelnames[ ii ] ); + if ( !extract_shmem_type_from_name( modelname_buffer, + modelnames[ ii ], + 64, + &( shmem_prefix[ ii ] ) ) ) + { + fprintf( stderr, "Unknown shmem type on %s\n", modelname_buffer ); + exit( 1 ); + } extract_dcu_rate_from_name( modelnames[ ii ], &dcuid[ ii ], &modelrates[ ii ] ); trim_dcuid_and_rate_from_name( modelnames[ ii ] ); @@ -609,9 +624,11 @@ int __CDECL symmetricom_fd = open( "/dev/gpstime", O_RDWR | O_SYNC ); if ( symmetricom_fd < 0 ) { - if (gpsclock_init() < 0) { - perror("Unable to open /dev/gpstime or initialize the system based gpsclock"); - exit(1); + if ( gpsclock_init( ) < 0 ) + { + perror( "Unable to open /dev/gpstime or initialize the system " + "based gpsclock" ); + exit( 1 ); } } gps_ok = symm_ok( ); @@ -623,15 +640,19 @@ int __CDECL gps_stt ); // Find the shared memory locations for the various model names + // once we start opening shmem segments we must cleanup and cannot just + // exit. for ( ii = 0; ii < nsys; ii++ ) { char shmem_fname[ 128 ]; - sprintf( shmem_fname, "%s_daq", modelnames[ ii ] ); - void* dcu_addr = findSharedMemory( shmem_fname ); + sprintf( + shmem_fname, "%s%s_daq", shmem_prefix[ ii ], modelnames[ ii ] ); + shmHandles[ ii ] = shmem_open( shmem_fname, 64 ); + void* dcu_addr = (void*)shmem_mapping( shmHandles[ ii ] ); if ( dcu_addr == NULL ) { fprintf( stderr, "Can't map shmem\n" ); - exit( -1 ); + goto cleanup; } else { @@ -661,7 +682,7 @@ int __CDECL fprintf( stderr, "Unable to determine the rate of %s\n", modelnames[ ii ] ); - exit( 1 ); + goto cleanup; } } fprintf( stderr, @@ -672,7 +693,13 @@ int __CDECL } // Get pointers to local DAQ mbuf - ifo = (char*)findSharedMemorySize( buffer_name, max_data_size_mb ); + ifo_handle = shmem_open( buffer_name, max_data_size_mb ); + ifo = (void*)shmem_mapping( ifo_handle ); + if ( !ifo ) + { + fprintf( stderr, "Unable to open output buffer\n" ); + goto cleanup; + } ifo_header = (daq_multi_cycle_header_t*)ifo; ifo_data = (char*)ifo + sizeof( daq_multi_cycle_header_t ); cycle_data_size = ( max_data_size - sizeof( daq_multi_cycle_header_t ) ) / @@ -690,6 +717,13 @@ int __CDECL error = send_to_local_memory( nsys, do_wait ); } while ( error == 0 && keepRunning == 1 ); +cleanup: + shmem_close( ifo_handle ); + for ( ii = 0; ii < nsys; ii++ ) + { + shmem_close( shmHandles[ ii ] ); + } + if ( gds_tp_dir ) { free( (void*)gds_tp_dir ); diff --git a/src/local_dc/local_dc_utils.h b/src/local_dc/local_dc_utils.h index a870f7891289a1f00a93cb6ff6a7f94e78e154db..e22fb8e6d3b9b3a373c3f7d417273f4f32f07e83 100644 --- a/src/local_dc/local_dc_utils.h +++ b/src/local_dc/local_dc_utils.h @@ -12,6 +12,8 @@ #include <stdlib.h> #include <unistd.h> +#include "drv/shmem.h" + /*! * @brief extract a list of models to run from a systab file * @param f The file to read models from @@ -95,6 +97,57 @@ extract_models_from_table_file( const char* table, char* dest, int size ) return rc; } +/*! + * @brief given a system name, extract the shmem buffer type + name + * @param input the string to parse + * @param name_dest destination buffer for the model name + * @param name_dest_len size of model_name buffer + * @param shmem_type Pointer to a char* to hold the buffer type + * @return 1 on ok, 0 on error + */ +static int +extract_shmem_type_from_name( const char* input, + char* name_dest, + size_t name_dest_len, + const char** shmem_type ) +{ + static const char* mbuf = SHMEM_MBUF_PREFIX; + static const char* posix = SHMEM_POSIX_PREFIX; + const char* dummy = NULL; + + if ( shmem_type == NULL ) + { + shmem_type = &dummy; + } + if ( strncmp( input, SHMEM_MBUF_PREFIX, strlen( SHMEM_MBUF_PREFIX ) ) == 0 ) + { + strncpy( + name_dest, input + strlen( SHMEM_MBUF_PREFIX ), name_dest_len ); + *shmem_type = mbuf; + } + else if ( strncmp( input, + SHMEM_POSIX_PREFIX, + strlen( SHMEM_POSIX_PREFIX ) ) == 0 ) + { + strncpy( + name_dest, input + strlen( SHMEM_POSIX_PREFIX ), name_dest_len ); + *shmem_type = posix; + } + else if ( strstr( input, "://" ) != NULL ) + { + name_dest[ 0 ] = '\0'; + *shmem_type = NULL; + return 0; + } + else + { + strncpy( name_dest, input, name_dest_len ); + *shmem_type = mbuf; + } + name_dest[ name_dest_len - 1 ] = '\0'; + return 1; +} + /*! * @brief given the tail of a system name + dcu + rate, extract the dcu + rate. * @param input The text to parse must be either "model:dcuid:rate" or diff --git a/src/local_dc/test_local_dc_posix_shmem.sh.in b/src/local_dc/test_local_dc_posix_shmem.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..0d7c39093bc5e96b5dba8bd51752a0e720de104b --- /dev/null +++ b/src/local_dc/test_local_dc_posix_shmem.sh.in @@ -0,0 +1,105 @@ +#!/bin/bash + +CWD="@CMAKE_CURRENT_BINARY_DIR@" + +TDIR="" +PID_STREAM=0 +PID_LOCAL_DC=0 + +function kill_proc { + if [ $1 -gt 0 ]; then + echo "Closing process $1" + kill $1 + fi +} + +function cleanup { + if [ "x$TDIR" != "x" ]; then + if [ -d $TDIR ]; then + rm -rf "$TDIR" + fi + fi + for name in mod5_daq mod6_daq mod7_daq mod250_daq mod255_daq local_dc; do + if [ -e "/dev/shm/$name" ]; then + rm -f "/dev/shm/$name" + fi + done + kill_proc $PID_STREAM + kill_proc $PID_LOCAL_DC +} + +LOCAL_DC="$CWD/local_dc" +if [ ! -x "$LOCAL_DC" ]; then + echo "local_dc is required" + exit 1 +fi + +FE_STREAM_CHECK="$CWD/../fe_stream_test/fe_stream_check" +if [ ! -x "$FE_STREAM_CHECK" ]; then + echo "fe_stream_check is required" + exit 1 +fi + +FE_MULTI_STREAM_TEST="$CWD/../fe_stream_test/fe_multi_stream_test" +if [ ! -x "$FE_MULTI_STREAM_TEST" ]; then + echo "fe_multi_stream_test is required" + exit 1 +fi + +if [ ! -r /dev/gpstime ]; then + echo "the gpstime module must be loaded, configured, and accessible by this user" + exit 1 +fi + +if [ ! -r /dev/mbuf ]; then + echo "the mbuf module must be loaded, configured, and accessible by this user" + exit 1 +fi + +PYTHON="" +which python > /dev/null +if [ $? -eq 0 ]; then + PYTHON=`which python` +else + which python3 > /dev/null + if [ $? -eq 0 ]; then + PYTHON=`which python3` + else + echo "Cannot find python or python3" + exit 1 + fi +fi + +trap cleanup EXIT + +TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"` +mkdir "$TDIR/ini_files" +mkdir "$TDIR/logs" + +echo "Ini dir = $TDIR/ini_files" + +"$FE_MULTI_STREAM_TEST" -S -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -k 300 -s shm:// -D 5,6,7,250,255, > "$TDIR/logs/models" & +PID_STREAM=$! + +echo "Streamer PID = $PID_STREAM" + +sleep 1 + +"$LOCAL_DC" -m 100 -s "shm://mod5 shm://mod6 shm://mod7 shm://mod250 shm://mod255" -d "$TDIR/ini_files" -b "shm://local_dc" > "$TDIR/logs/local_dc" & +PID_LOCAL_DC=$! + +echo "Local_dc PID = $PID_LOCAL_DC" + +sleep 2 + +"$FE_STREAM_CHECK" -m shm://local_dc -s 100 -v -c "$TDIR/ini_files/mod5.ini" "$TDIR/ini_files/tpchn_mod5.par" \ +-c "$TDIR/ini_files/mod6.ini" "$TDIR/ini_files/tpchn_mod6.par" \ +-c "$TDIR/ini_files/mod7.ini" "$TDIR/ini_files/tpchn_mod7.par" \ +-c "$TDIR/ini_files/mod250.ini" "$TDIR/ini_files/tpchn_mod250.par" \ +-c "$TDIR/ini_files/mod255.ini" "$TDIR/ini_files/tpchn_mod255.par" +RESULT=$? + +exit $? +#echo "Press enter to continue..." +#DUMMY="" +#read DUMMY \ No newline at end of file diff --git a/src/local_dc/tests/check_for_dcu_existence.cc b/src/local_dc/tests/check_for_dcu_existence.cc index 491def3c5acc5d12fb0cf54c105ec6bbc8bc0259..edc1845eafaa577f9792e7ef7905493c192e62b1 100644 --- a/src/local_dc/tests/check_for_dcu_existence.cc +++ b/src/local_dc/tests/check_for_dcu_existence.cc @@ -142,8 +142,9 @@ main( int argc, char* argv[] ) auto expected_dcus = parse_dcu_list( dcu_expected_str ); auto prohibited_dcus = parse_dcu_list( dcu_prohibited_str ); - auto raw_buffer = shmem_open_segment( mbuf_name, size_mb * 1024 * 1024 ); - auto prev_cycle = get_cur_cycle( raw_buffer ); + shmem::shmem shmem_obj( mbuf_name, size_mb ); + auto* raw_buffer = shmem_obj.mapping< void >( ); + auto prev_cycle = get_cur_cycle( raw_buffer ); std::set< unsigned int > dcus_seen; diff --git a/src/local_dc/tests/test_local_dc_unit_tests.c b/src/local_dc/tests/test_local_dc_unit_tests.c index 052ab2b29f5473b4e4bbddaf6b6f28bde7ae679a..cf39539b6ba969e8b9834e705fa2746977cab3c9 100644 --- a/src/local_dc/tests/test_local_dc_unit_tests.c +++ b/src/local_dc/tests/test_local_dc_unit_tests.c @@ -4,6 +4,7 @@ #include "local_dc_utils.h" #include <stdio.h> +#include "drv/shmem.h" int test_extract_dcu_rate_from_name( const char* input, @@ -269,17 +270,87 @@ do_extract_models_from_table_tests_fail( ) } } +void do_extract_models_from_table_tests( ) { do_extract_models_from_table_tests_ok( ); do_extract_models_from_table_tests_fail( ); } +typedef struct shmem_type_test_case +{ + const char* input; + const char* expected_name; + const char* expected_type; + int expected_result; +} shmem_type_test_case; + +void +do_shmem_type_tests( ) +{ + int i = 0; + int failed = 0; + int rc = 0; + const char* shmem_type = NULL; + char name_buffer[ 128 ]; + shmem_type_test_case test_cases[] = { + { "h1iop1", "h1iop1", SHMEM_MBUF_PREFIX, 1 }, + { "mbuf://h1iop1", "h1iop1", SHMEM_MBUF_PREFIX, 1 }, + { "mbuf://h1iop1:52", "h1iop1:52", SHMEM_MBUF_PREFIX, 1 }, + { "shm://h1iop1", "h1iop1", SHMEM_POSIX_PREFIX, 1 }, + { "other://h1iop1", "", NULL, 0 }, + { "", "", NULL, 0 }, + }; + + i = 0; + do + { + failed = 0; + rc = extract_shmem_type_from_name( test_cases[ i ].input, + name_buffer, + sizeof( name_buffer ), + &shmem_type ); + if ( rc != test_cases[ i ].expected_result ) + { + failed = 1; + } + else + { + if ( strcmp( test_cases[ i ].expected_name, name_buffer ) != 0 ) + { + failed = 1; + } + if ( shmem_type == NULL || + strcmp( test_cases[ i ].expected_type, shmem_type ) != 0 ) + { + if ( shmem_type == NULL && + test_cases[ i ].expected_type != NULL ) + { + failed = 1; + } + } + } + if ( failed ) + { + fprintf( stderr, + "Failed test: '%s' '%s' '%s' rc(%d) != %d", + test_cases[ i ].input, + name_buffer, + shmem_type, + rc, + test_cases[ i ].expected_result ); + exit( 1 ); + } + i++; + } while ( strcmp( test_cases[ i ].input, "" ) != 0 ); +} + int main( int argc, char* argv[] ) { do_extract_tests( ); do_trim_tests( ); do_extract_models_from_table_tests( ); + do_shmem_type_tests( ); return 0; } \ No newline at end of file diff --git a/src/pub_sub_stream/CMakeLists.txt b/src/pub_sub_stream/CMakeLists.txt index 854b0fba46d4833dd3e5c596777e641ba0644064..c04e7f31b319344e6a4a9f4230fbeeafa75393ee 100644 --- a/src/pub_sub_stream/CMakeLists.txt +++ b/src/pub_sub_stream/CMakeLists.txt @@ -77,7 +77,8 @@ if (libcds-pubsub_FOUND) pv::simple_pv args cds::pub_sub - pub_sub_plugins) + pub_sub_plugins + driver::shmem) target_requires_cpp11(cds_pub_sub PUBLIC) # add_executable(cds_pub_sub_asan @@ -118,26 +119,46 @@ if (libcds-pubsub_FOUND) ${CMAKE_THREAD_LIBS_INIT}) configure_file(test_pub_sub_xmit_recv.sh.in test_pub_sub_xmit_recv.sh @ONLY) + configure_file(test_pub_sub_xmit_recv_posix_shmem.sh.in test_pub_sub_xmit_recv_posix_shmem.sh @ONLY) configure_file(test_pub_sub.sh.in test_pub_sub.sh @ONLY) + configure_file(test_pub_sub_posix_shmem.sh.in test_pub_sub_posix_shmem.sh @ONLY) configure_file(test_pub_sub_rmipc_to_daqm.sh.in test_pub_sub_rmipc_to_daqm.sh @ONLY) + configure_file(test_pub_sub_rmipc_to_daqm_posix_shmem.sh.in test_pub_sub_rmipc_to_daqm_posix_shmem.sh @ONLY) configure_file(test_pub_sub_daqm_to_daqm.sh.in test_pub_sub_daqm_to_daqm.sh @ONLY) + configure_file(test_pub_sub_daqm_to_daqm_posix_shmem.sh.in test_pub_sub_daqm_to_daqm_posix_shmem.sh @ONLY) add_test(NAME "test_pub_sub_xmit_recv" COMMAND /bin/bash ./test_pub_sub_xmit_recv.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_pub_sub_xmit_recv_posix_shmem" + COMMAND /bin/bash ./test_pub_sub_xmit_recv_posix_shmem.sh + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_pub_sub" COMMAND /bin/bash ./test_pub_sub.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_pub_sub_posix_shmem" + COMMAND /bin/bash ./test_pub_sub_posix_shmem.sh + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_pub_sub_rmipc_to_daqm" COMMAND /bin/bash ./test_pub_sub_rmipc_to_daqm.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_pub_sub_rmipc_to_daqm_posix_shmem" + COMMAND /bin/bash ./test_pub_sub_rmipc_to_daqm_posix_shmem.sh + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_pub_sub_daqm_to_daqm" COMMAND /bin/bash ./test_pub_sub_daqm_to_daqm.sh WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_pub_sub_daqm_to_daqm_posix_shmem" + COMMAND /bin/bash ./test_pub_sub_daqm_to_daqm_posix_shmem.sh + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") + add_test(NAME "test_buffer_entry" COMMAND test_buffer_entry) diff --git a/src/pub_sub_stream/plugins/pub_plugin_daq_m.cc b/src/pub_sub_stream/plugins/pub_plugin_daq_m.cc index f6980d451509df13d5af45fb4f38666ada8e7215..6df83c7785a54ebe2b0b0c8ab20dfcc04585270c 100644 --- a/src/pub_sub_stream/plugins/pub_plugin_daq_m.cc +++ b/src/pub_sub_stream/plugins/pub_plugin_daq_m.cc @@ -25,8 +25,8 @@ namespace cps_plugins public: DaqMPublisherInstance( const std::string& name, std::size_t size_in_mb ) - : dest_{ (daq_multi_cycle_data_t*)shmem_open_segment( - name.c_str( ), size_in_mb * 1024 * 1024 ) }, + : shmem_obj_( name.c_str( ), size_in_mb ), + dest_{ shmem_obj_.mapping< daq_multi_cycle_data_t >( ) }, size_in_mb_{ size_in_mb }, cycle_data_size_{ calculate_cycle_data_size( size_in_mb ) } @@ -73,9 +73,10 @@ namespace cps_plugins } private: - daq_multi_cycle_data_t* dest_; - std::size_t size_in_mb_; - unsigned int cycle_data_size_; + shmem::shmem shmem_obj_; + volatile daq_multi_cycle_data_t* dest_; + std::size_t size_in_mb_; + unsigned int cycle_data_size_; }; } // namespace detail @@ -106,7 +107,12 @@ namespace cps_plugins throw std::runtime_error( "Invalid publisher type passed to the daqm publisher" ); } - auto conn_str = address.substr( prefix( ).size( ) ); + + std::string conn_str; + std::string shmem_prefix; + + shmem::parse_name(address.substr( prefix( ).size( ) ), shmem_prefix, conn_str); + auto sep_index = conn_str.find( ':' ); auto name = conn_str; std::size_t buffer_size_mb = 100; @@ -117,6 +123,6 @@ namespace cps_plugins buffer_size_mb = std::stoi( size_str ); } return make_unique_ptr< detail::DaqMPublisherInstance >( - name, buffer_size_mb ); + shmem_prefix + name, buffer_size_mb ); } } // namespace cps_plugins \ No newline at end of file diff --git a/src/pub_sub_stream/plugins/sub_plugin_daq_m.cc b/src/pub_sub_stream/plugins/sub_plugin_daq_m.cc index 6c985f4ff20db6bbff3d766a35de4668eadaaa19..07eb8b75842bbb9f46a8217fa5db93a83dcb30f0 100644 --- a/src/pub_sub_stream/plugins/sub_plugin_daq_m.cc +++ b/src/pub_sub_stream/plugins/sub_plugin_daq_m.cc @@ -45,9 +45,9 @@ namespace cps_plugins std::size_t size_in_mb, pub_sub::SubDebugNotices& debug, pub_sub::SubHandler handler ) - : Subscription( ), - shmem_{ (daq_multi_cycle_data_t*)shmem_open_segment( - name.c_str( ), size_in_mb * 1024 * 1024 ) }, + : Subscription( ), shmem_obj_( name.c_str( ), size_in_mb ), + shmem_{ const_cast< daq_multi_cycle_data_t* >( + shmem_obj_.mapping< daq_multi_cycle_data_t >( ) ) }, handler_{ std::move( handler ) }, memory_arena_( 5 ), stopping_{ false }, th_{} { @@ -159,6 +159,7 @@ namespace cps_plugins } } + shmem::shmem shmem_obj_; daq_multi_cycle_data_t* shmem_; Arena memory_arena_; pub_sub::SubHandler handler_; @@ -204,7 +205,11 @@ namespace cps_plugins throw std::runtime_error( "Invalid subscription type passed the " "the daq memory subscriber" ); } - auto conn_str = address.substr( prefix( ).size( ) ); + std::string conn_str; + std::string shmem_prefix; + + shmem::parse_name(address.substr( prefix( ).size( ) ), shmem_prefix, conn_str); + auto sep_index = conn_str.find( ':' ); auto name = conn_str; std::size_t buffer_size_mb = 100; @@ -215,7 +220,7 @@ namespace cps_plugins buffer_size_mb = std::stoi( size_str ); } subscriptions_.emplace_back( make_unique_ptr< detail::DaqMSub >( - name, buffer_size_mb, debug_hooks, std::move( handler ) ) ); + shmem_prefix + name, buffer_size_mb, debug_hooks, std::move( handler ) ) ); return subscriptions_.back( )->sub_id( ); } diff --git a/src/pub_sub_stream/plugins/sub_plugin_rmipc.cc b/src/pub_sub_stream/plugins/sub_plugin_rmipc.cc index 6aa5b35ea38b8ee80fac2430e41007ac13aabffc..b4184439e4ce15264fd73343d376679fefcff067 100644 --- a/src/pub_sub_stream/plugins/sub_plugin_rmipc.cc +++ b/src/pub_sub_stream/plugins/sub_plugin_rmipc.cc @@ -71,7 +71,8 @@ namespace cps_plugins memory_arena_( 5 ), stopping_{ false }, th_{} { std::size_t i = 0; - size_t FE_MBUF_SIZE = 64*1024*1024; + size_t FE_MBUF_SIZE_MB = 64; + size_t FE_MBUF_SIZE = FE_MBUF_SIZE * 1024 * 1024; if ( !symmetricom_fd.get( ) ) { @@ -80,8 +81,10 @@ namespace cps_plugins for ( const auto& name : mbuf_names ) { std::string shmem_fname = name + "_daq"; - void* dcu_addr = (void*)shmem_open_segment( - shmem_fname.c_str( ), FE_MBUF_SIZE ); + shmObjs[ i ] = + shmem::shmem( shmem_fname.c_str( ), FE_MBUF_SIZE_MB ); + auto* dcu_addr = + const_cast< void* >( shmObjs[ i ].mapping< void >( ) ); if ( !dcu_addr ) { throw std::runtime_error( @@ -96,8 +99,13 @@ namespace cps_plugins (struct cdsDaqNetGdsTpNum*)( (char*)dcu_addr + CDS_DAQ_NET_GDS_TP_TABLE_OFFSET ); - auto status = get_model_rate_dcuid( - &modelrates[ i ], &dcuid[ i ], name.c_str( ), nullptr ); + + std::string shmem_prefix, raw_name; + shmem::parse_name( name, shmem_prefix, raw_name ); + auto status = get_model_rate_dcuid( &modelrates[ i ], + &dcuid[ i ], + raw_name.c_str( ), + nullptr ); if ( status != 0 || modelrates[ i ] == 0 ) { std::ostringstream os; @@ -151,8 +159,6 @@ namespace cps_plugins return req; } - - // ******************************************************************************* // Wait for data ready from FE models // ******************************************************************************* @@ -386,6 +392,7 @@ namespace cps_plugins } } + std::array< shmem::shmem, rmipc_max_subs > shmObjs; std::array< struct rmIpcStr*, rmipc_max_subs > shmIpcPtr; std::array< char*, rmipc_max_subs > shmDataPtr; std::array< struct cdsDaqNetGdsTpNum*, rmipc_max_subs > shmTpTable; diff --git a/src/pub_sub_stream/pub_xmit.cc b/src/pub_sub_stream/pub_xmit.cc index 77f9c7a5a75c72278b2e83ec85abea353884d149..ed7caed843765b8c158d05cb0f46f093e7969add 100644 --- a/src/pub_sub_stream/pub_xmit.cc +++ b/src/pub_sub_stream/pub_xmit.cc @@ -377,7 +377,8 @@ main( int argc, char* argv[] ) "the publisher\n" ); // Get pointers to local DAQ mbuf - ifo = (char*)findSharedMemorySize( (char*)buffer_name, max_data_size_mb ); + shmem::shmem shmem_obj( buffer_name, max_data_size_mb ); + ifo = const_cast< char* >( shmem_obj.mapping< char >( ) ); ifo_header = (daq_multi_cycle_header_t*)ifo; ifo_data = (char*)ifo + sizeof( daq_multi_cycle_header_t ); cycle_data_size = ( max_data_size - sizeof( daq_multi_cycle_header_t ) ) / diff --git a/src/pub_sub_stream/sub_recv_bufferred.cc b/src/pub_sub_stream/sub_recv_bufferred.cc index 6db90325f1552dbc738b3fcd0e521b33829d90ca..564a19421224e174d533a306716b1b10fb536aca 100644 --- a/src/pub_sub_stream/sub_recv_bufferred.cc +++ b/src/pub_sub_stream/sub_recv_bufferred.cc @@ -45,6 +45,7 @@ #include "dc_stats.hh" #include "message_filter.hh" +#include "drv/shmem.h" #define __CDECL @@ -53,10 +54,6 @@ #define MAX_FE_COMPUTERS 32 -extern "C" { -extern void* findSharedMemorySize( char*, int ); -} - int do_verbose = 0; struct thread_info @@ -268,8 +265,8 @@ public: data_recorder( const std::string shmem_name, int shmem_max_size_mb, dc_queue* next_stage ) - : shmem_ptr_( findSharedMemorySize( - const_cast< char* >( shmem_name.c_str( ) ), shmem_max_size_mb ) ), + : shmem_obj_( shmem_name.c_str( ), shmem_max_size_mb ), + shmem_ptr_( const_cast< char* >( shmem_obj_.mapping< char >( ) ) ), next_stage_{ next_stage } { ifo_header_ = (daq_multi_cycle_header_t*)shmem_ptr_; @@ -322,6 +319,7 @@ public: } private: + shmem::shmem shmem_obj_; void* shmem_ptr_; daq_multi_cycle_header_t* ifo_header_; char* ifo_data_; diff --git a/src/pub_sub_stream/test_pub_sub_daqm_to_daqm.sh.in b/src/pub_sub_stream/test_pub_sub_daqm_to_daqm.sh.in index 2b278548a46e10c5ee01400659bd8f0306c91251..68df3cc751aa9bbca62774718a39ae7a6f127b44 100644 --- a/src/pub_sub_stream/test_pub_sub_daqm_to_daqm.sh.in +++ b/src/pub_sub_stream/test_pub_sub_daqm_to_daqm.sh.in @@ -73,7 +73,7 @@ mkdir "$TDIR/logs" echo "Ini dir = $TDIR/ini_files" -"$FE_MULTI_STREAM_TEST" -b local_dc -m 64 -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -k 2048 -D 5,6,255 > "$TDIR/logs/models" & +"$FE_MULTI_STREAM_TEST" -b local_dc -m 100 -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -k 2048 -D 5,6,255 > "$TDIR/logs/models" & PID_STREAM=$! sleep 2 diff --git a/src/pub_sub_stream/test_pub_sub_daqm_to_daqm_posix_shmem.sh.in b/src/pub_sub_stream/test_pub_sub_daqm_to_daqm_posix_shmem.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..b60b8a514b5320d27a51fee0defee55524196c19 --- /dev/null +++ b/src/pub_sub_stream/test_pub_sub_daqm_to_daqm_posix_shmem.sh.in @@ -0,0 +1,98 @@ +#!/bin/bash + +CWD="@CMAKE_CURRENT_BINARY_DIR@" + +TDIR="" +PID_STREAM=0 +PID_PUB_SUB=0 + +function kill_proc { + if [ $1 -gt 0 ]; then + echo "Closing process $1" + kill $1 + fi +} + +function cleanup { + if [ "x$TDIR" != "x" ]; then + if [ -d $TDIR ]; then + rm -rf "$TDIR" + fi + fi + for name in pub_sub_test local_dc; do + if [ -e "/dev/shm/$name" ]; then + rm -f "/dev/shm/$name" + fi + done + kill_proc $PID_STREAM + kill_proc $PID_PUB_SUB +} + +PUB_SUB="$CWD/cds_pub_sub" +if [ ! -x "$PUB_SUB" ]; then + echo "cds_pub_sub is required" + exit 1 +fi + +FE_STREAM_CHECK="$CWD/../fe_stream_test/fe_stream_check" +if [ ! -x "$FE_STREAM_CHECK" ]; then + echo "fe_stream_check is required" + exit 1 +fi + +FE_MULTI_STREAM_TEST="$CWD/../fe_stream_test/fe_multi_stream_test" +if [ ! -x "$FE_MULTI_STREAM_TEST" ]; then + echo "fe_multi_stream_test is required" + exit 1 +fi + +if [ ! -r /dev/gpstime ]; then + echo "the gpstime module must be loaded, configured, and accessible by this user" + exit 1 +fi + +PYTHON="" +which python > /dev/null +if [ $? -eq 0 ]; then + PYTHON=`which python` +else + which python3 > /dev/null + if [ $? -eq 0 ]; then + PYTHON=`which python3` + else + echo "Cannot find python or python3" + exit 1 + fi +fi + +trap cleanup EXIT + +TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"` +mkdir "$TDIR/ini_files" +mkdir "$TDIR/logs" + +echo "Ini dir = $TDIR/ini_files" + +"$FE_MULTI_STREAM_TEST" -b local_dc -m 100 -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -k 2048 -D 5,6,255 -s shm:// > "$TDIR/logs/models" & +PID_STREAM=$! + +sleep 2 + +echo "Streamer PID = $PID_STREAM" + +GDS_TP_DIR="$TDIR/ini_files" +export GDS_TP_DIR +"$PUB_SUB" -i "daqm://shm://local_dc:100" -o "daqm://shm://pub_sub_test:100" > "$TDIR/logs/pub_xmit.log" & +PID_PUB_SUB=$! + +sleep 3 + +"$FE_STREAM_CHECK" -m shm://pub_sub_test -s 100 -v -c "$TDIR/ini_files/mod5.ini" "$TDIR/ini_files/tpchn_mod5.par" \ +-c "$TDIR/ini_files/mod6.ini" "$TDIR/ini_files/tpchn_mod6.par" \ +-c "$TDIR/ini_files/mod255.ini" "$TDIR/ini_files/tpchn_mod255.par" +RESULT=$? + +exit $? +#echo "Press enter to continue..." +#DUMMY="" +#read DUMMY \ No newline at end of file diff --git a/src/pub_sub_stream/test_pub_sub_posix_shmem.sh.in b/src/pub_sub_stream/test_pub_sub_posix_shmem.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..be93921666467e347c88f8d28e978e89f043f3b3 --- /dev/null +++ b/src/pub_sub_stream/test_pub_sub_posix_shmem.sh.in @@ -0,0 +1,111 @@ +#!/bin/bash + +CWD="@CMAKE_CURRENT_BINARY_DIR@" + +TDIR="" +PID_STREAM=0 +PID_PUB_SUB=0 +PID_SUB_RECV=0 + +function kill_proc { + if [ $1 -gt 0 ]; then + echo "Closing process $1" + kill $1 + fi +} + +function cleanup { + if [ "x$TDIR" != "x" ]; then + if [ -d $TDIR ]; then + rm -rf "$TDIR" + fi + fi + for name in mod5_daq mod6_daq mod255_daq sub_recv; do + if [ -e "/dev/shm/$name" ]; then + rm -f "/dev/shm/$name" + fi + done + kill_proc $PID_STREAM + kill_proc $PID_PUB_SUB + kill_proc $PID_SUB_RECV +} + +PUB_SUB="$CWD/cds_pub_sub" +if [ ! -x "$PUB_SUB" ]; then + echo "cds_pub_sub is required" + exit 1 +fi + +SUB_RECV="$CWD/cps_recv" +if [ ! -x "$SUB_RECV" ]; then + echo "cps_recv is required" + exit 1 +fi + +FE_STREAM_CHECK="$CWD/../fe_stream_test/fe_stream_check" +if [ ! -x "$FE_STREAM_CHECK" ]; then + echo "fe_stream_check is required" + exit 1 +fi + +FE_MULTI_STREAM_TEST="$CWD/../fe_stream_test/fe_multi_stream_test" +if [ ! -x "$FE_MULTI_STREAM_TEST" ]; then + echo "fe_multi_stream_test is required" + exit 1 +fi + +if [ ! -r /dev/gpstime ]; then + echo "the gpstime module must be loaded, configured, and accessible by this user" + exit 1 +fi + +PYTHON="" +which python > /dev/null +if [ $? -eq 0 ]; then + PYTHON=`which python` +else + which python3 > /dev/null + if [ $? -eq 0 ]; then + PYTHON=`which python3` + else + echo "Cannot find python or python3" + exit 1 + fi +fi + +trap cleanup EXIT + +TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"` +mkdir "$TDIR/ini_files" +mkdir "$TDIR/logs" + +echo "Ini dir = $TDIR/ini_files" + +"$FE_MULTI_STREAM_TEST" -S -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -k 2048 -D 5,6,255 -s shm:// > "$TDIR/logs/models" & +PID_STREAM=$! + +sleep 2 + +echo "Streamer PID = $PID_STREAM" + +GDS_TP_DIR="$TDIR/ini_files" +export GDS_TP_DIR +"$PUB_SUB" -i "rmipc://shm://mod5,shm://mod6,shm://mod255" -o "tcp://127.0.0.1:9000" > "$TDIR/logs/pub_xmit.log" & +PID_PUB_SUB=$! + +sleep 1 + +"$SUB_RECV" -b "shm://sub_recv" -m 100 -s "tcp://127.0.0.1:9000" > "$TDIR/logs/sub_recv.log" & +PID_SUB_RECV=$! + +sleep 2 + +"$FE_STREAM_CHECK" -m shm://sub_recv -s 100 -v -c "$TDIR/ini_files/mod5.ini" "$TDIR/ini_files/tpchn_mod5.par" \ +-c "$TDIR/ini_files/mod6.ini" "$TDIR/ini_files/tpchn_mod6.par" \ +-c "$TDIR/ini_files/mod255.ini" "$TDIR/ini_files/tpchn_mod255.par" +RESULT=$? + +exit $? +#echo "Press enter to continue..." +#DUMMY="" +#read DUMMY \ No newline at end of file diff --git a/src/pub_sub_stream/test_pub_sub_rmipc_to_daqm_posix_shmem.sh.in b/src/pub_sub_stream/test_pub_sub_rmipc_to_daqm_posix_shmem.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..2148dee2b79be12ddd065b7c8915d14dde68622d --- /dev/null +++ b/src/pub_sub_stream/test_pub_sub_rmipc_to_daqm_posix_shmem.sh.in @@ -0,0 +1,98 @@ +#!/bin/bash + +CWD="@CMAKE_CURRENT_BINARY_DIR@" + +TDIR="" +PID_STREAM=0 +PID_PUB_SUB=0 + +function kill_proc { + if [ $1 -gt 0 ]; then + echo "Closing process $1" + kill $1 + fi +} + +function cleanup { + if [ "x$TDIR" != "x" ]; then + if [ -d $TDIR ]; then + rm -rf "$TDIR" + fi + fi + for name in pub_sub_test mod5_daq mod6_daq mod255_daq; do + if [ -e "/dev/shm/$name" ]; then + rm -f "/dev/shm/$name" + fi + done + kill_proc $PID_STREAM + kill_proc $PID_PUB_SUB +} + +PUB_SUB="$CWD/cds_pub_sub" +if [ ! -x "$PUB_SUB" ]; then + echo "cds_pub_sub is required" + exit 1 +fi + +FE_STREAM_CHECK="$CWD/../fe_stream_test/fe_stream_check" +if [ ! -x "$FE_STREAM_CHECK" ]; then + echo "fe_stream_check is required" + exit 1 +fi + +FE_MULTI_STREAM_TEST="$CWD/../fe_stream_test/fe_multi_stream_test" +if [ ! -x "$FE_MULTI_STREAM_TEST" ]; then + echo "fe_multi_stream_test is required" + exit 1 +fi + +if [ ! -r /dev/gpstime ]; then + echo "the gpstime module must be loaded, configured, and accessible by this user" + exit 1 +fi + +PYTHON="" +which python > /dev/null +if [ $? -eq 0 ]; then + PYTHON=`which python` +else + which python3 > /dev/null + if [ $? -eq 0 ]; then + PYTHON=`which python3` + else + echo "Cannot find python or python3" + exit 1 + fi +fi + +trap cleanup EXIT + +TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"` +mkdir "$TDIR/ini_files" +mkdir "$TDIR/logs" + +echo "Ini dir = $TDIR/ini_files" + +"$FE_MULTI_STREAM_TEST" -S -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -k 2048 -D 5,6,255 -s shm:// > "$TDIR/logs/models" & +PID_STREAM=$! + +sleep 2 + +echo "Streamer PID = $PID_STREAM" + +GDS_TP_DIR="$TDIR/ini_files" +export GDS_TP_DIR +"$PUB_SUB" -i "rmipc://shm://mod5,shm://mod6,shm://mod255" -o "daqm://shm://pub_sub_test:100" > "$TDIR/logs/pub_xmit.log" & +PID_PUB_SUB=$! + +sleep 3 + +"$FE_STREAM_CHECK" -m shm://pub_sub_test -s 100 -v -c "$TDIR/ini_files/mod5.ini" "$TDIR/ini_files/tpchn_mod5.par" \ +-c "$TDIR/ini_files/mod6.ini" "$TDIR/ini_files/tpchn_mod6.par" \ +-c "$TDIR/ini_files/mod255.ini" "$TDIR/ini_files/tpchn_mod255.par" +RESULT=$? + +exit $? +#echo "Press enter to continue..." +#DUMMY="" +#read DUMMY \ No newline at end of file diff --git a/src/pub_sub_stream/test_pub_sub_xmit_recv_posix_shmem.sh.in b/src/pub_sub_stream/test_pub_sub_xmit_recv_posix_shmem.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..1802646408f8968149b6e51ca184724963df953e --- /dev/null +++ b/src/pub_sub_stream/test_pub_sub_xmit_recv_posix_shmem.sh.in @@ -0,0 +1,120 @@ +#!/bin/bash + +CWD="@CMAKE_CURRENT_BINARY_DIR@" + +TDIR="" +PID_STREAM=0 +PID_LOCAL_DC=0 +PID_PUB_XMIT=0 +PID_SUB_RECV=0 + +function kill_proc { + if [ $1 -gt 0 ]; then + echo "Closing process $1" + kill $1 + fi +} + +function cleanup { + if [ "x$TDIR" != "x" ]; then + if [ -d $TDIR ]; then + rm -rf "$TDIR" + fi + fi + for name in local_dc sub_recv mod5_daq mod6_daq mod255_daq; do + if [ -e "/dev/shm/$name" ]; then + rm -f "/dev/shm/$name" + fi + done + kill_proc $PID_STREAM + kill_proc $PID_LOCAL_DC + kill_proc $PID_PUB_XMIT + kill_proc $PID_SUB_RECV +} + +LOCAL_DC="$CWD/../local_dc/local_dc" +if [ ! -x "$LOCAL_DC" ]; then + echo "$LOCAL_DC" + exit 1 +fi + +PUB_XMIT="$CWD/cps_xmit" +if [ ! -x "$PUB_XMIT" ]; then + echo "cps_xmit is required" + exit 1 +fi + +SUB_RECV="$CWD/cps_recv" +if [ ! -x "$SUB_RECV" ]; then + echo "cps_recv is required" + exit 1 +fi + +FE_STREAM_CHECK="$CWD/../fe_stream_test/fe_stream_check" +if [ ! -x "$FE_STREAM_CHECK" ]; then + echo "fe_stream_check is required" + exit 1 +fi + +FE_MULTI_STREAM_TEST="$CWD/../fe_stream_test/fe_multi_stream_test" +if [ ! -x "$FE_MULTI_STREAM_TEST" ]; then + echo "fe_multi_stream_test is required" + exit 1 +fi + +if [ ! -r /dev/gpstime ]; then + echo "the gpstime module must be loaded, configured, and accessible by this user" + exit 1 +fi + +PYTHON="" +which python > /dev/null +if [ $? -eq 0 ]; then + PYTHON=`which python` +else + which python3 > /dev/null + if [ $? -eq 0 ]; then + PYTHON=`which python3` + else + echo "Cannot find python or python3" + exit 1 + fi +fi + +trap cleanup EXIT + +TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"` +mkdir "$TDIR/ini_files" +mkdir "$TDIR/logs" + +echo "Ini dir = $TDIR/ini_files" + +"$FE_MULTI_STREAM_TEST" -S -i "$TDIR/ini_files" -M "$TDIR/ini_files/master" -k 2048 -D 5,6,255 -s shm:// > "$TDIR/logs/models" & +PID_STREAM=$! + +sleep 2 + +echo "Streamer PID = $PID_STREAM" + +"$LOCAL_DC" -m 100 -s "shm://mod5 shm://mod6 shm://mod255" -d "$TDIR/ini_files" -b "shm://local_dc" > "$TDIR/logs/local_dc" & +PID_LOCAL_DC=$! + +sleep 1 + +"$PUB_XMIT" -b "shm://local_dc" -m 100 -p "tcp://127.0.0.1:9000" > "$TDIR/logs/pub_xmit.log" & +PID_PUB_XMIT=$! + +"$SUB_RECV" -b "shm://sub_recv" -m 100 -s "tcp://127.0.0.1:9000" > "$TDIR/logs/sub_recv.log" & +PID_SUB_RECV=$! + +sleep 2 + +"$FE_STREAM_CHECK" -m shm://sub_recv -s 100 -v -c "$TDIR/ini_files/mod5.ini" "$TDIR/ini_files/tpchn_mod5.par" \ +-c "$TDIR/ini_files/mod6.ini" "$TDIR/ini_files/tpchn_mod6.par" \ +-c "$TDIR/ini_files/mod255.ini" "$TDIR/ini_files/tpchn_mod255.par" +RESULT=$? + +exit $? +#echo "Press enter to continue..." +#DUMMY="" +#read DUMMY \ No newline at end of file diff --git a/src/shmem/shmem_awg.h b/src/shmem/shmem_awg.h index 2747ee7d8518b21dd48355a23f725a7c98169346..3801f496544de054d3149da7a2cf219cfdd80bdd 100644 --- a/src/shmem/shmem_awg.h +++ b/src/shmem/shmem_awg.h @@ -4,8 +4,9 @@ #include "awg_data.h" #define SHMEM_AWG_SUFFIX "_awg" -#define SHMEM_AWG_SIZE (64*(1024)*(1024)) +#define SHMEM_AWG_SIZE_MB 64 +#define SHMEM_AWG_SIZE ( SHMEM_AWG_SIZE_MB * ( 1024 ) * ( 1024 ) ) #define SHMEM_AWG_STRUCT AWG_DATA; -#endif //SHMEM_AWG_H +#endif // SHMEM_AWG_H diff --git a/src/shmem/shmem_iomem.h b/src/shmem/shmem_iomem.h index 702065bef72e80eb3c0b4c5db928deef33bfa915..1e65fe1728d3f32921e24dc3cf9e31e09ee342bc 100644 --- a/src/shmem/shmem_iomem.h +++ b/src/shmem/shmem_iomem.h @@ -8,7 +8,8 @@ #include "drv/cdsHardware.h" #define SHMEM_IOMEM_NAME "ipc" -#define SHMEM_IOMEM_SIZE (32*1024*1024) +#define SHMEM_IOMEM_SIZE_MB 32 +#define SHMEM_IOMEM_SIZE ( SHMEM_IOMEM_SIZE_MB * 1024 * 1024 ) #define SHMEM_IOMEM_STRUCT IO_MEM_DATA #endif // DAQD_TRUNK_SHMEM_IOMEM_H