diff --git a/src/zmq_stream/CMakeLists.txt b/src/zmq_stream/CMakeLists.txt index c43178a1e1f6a841895e0b180d7efc23b598749e..47d8bcadd2702c0fb29eeecad7426ca6d13b07b1 100644 --- a/src/zmq_stream/CMakeLists.txt +++ b/src/zmq_stream/CMakeLists.txt @@ -17,13 +17,8 @@ target_include_directories(zmq_transport PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../i target_include_directories(zmq_transport PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(zmq_transport PRIVATE zmq4::zmq) -#add_library(zmq_dc_recv STATIC zmq_dc_recv.c zmq_dc_recv.h) -#target_include_directories(zmq_dc_recv PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_dc_recv PUBLIC zmq4::zmq ${CMAKE_THREAD_LIBS_INIT}) -#add_library(zmq::dc_recv ALIAS zmq_dc_recv) - -add_executable(zmq_xmit zmq_fe.c dc_utils.c) -target_link_libraries(zmq_xmit PUBLIC driver::shmem zmq_transport zmq4::zmq zmq::simple_pv ${CMAKE_THREAD_LIBS_INIT}) +add_executable(zmq_xmit zmq_xmit.c dc_utils.c) +target_link_libraries(zmq_xmit PUBLIC driver::shmem zmq_transport zmq4::zmq ${CMAKE_THREAD_LIBS_INIT}) add_executable(zmq_recv zmq_recv.cc dc_utils.c) # target_compile_options(zmq_rcv_ix_xmit_cxx PUBLIC -fsanitize=address) @@ -38,69 +33,12 @@ target_link_libraries(zmq_recv PUBLIC ${CMAKE_THREAD_LIBS_INIT} # -no-pie ) -install(TARGETS zmq_recv DESTINATION bin) - -#add_executable(zmq_multi_stream zmq_multi_stream.c ../drv/rfm.c ${CMAKE_CURRENT_SOURCE_DIR}/../include/daq_core.h) -#target_include_directories(zmq_multi_stream PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_multi_stream PUBLIC zmq4::zmq) -#target_compile_options(zmq_multi_stream PUBLIC -std=gnu99) - - -#add_executable(zmq_threads zmq_threads.c dc_utils.c) -#target_include_directories(zmq_threads PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_threads PUBLIC -# driver::shmem -# zmq_transport -# zmq4::zmq -# zmq::simple_pv -# ${CMAKE_THREAD_LIBS_INIT}) -#target_compile_options(zmq_threads PUBLIC -std=gnu99) - -#add_executable(zmq_dc zmq_dc.cc) -#target_include_directories(zmq_dc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_dc PUBLIC zmq_dc_recv zmq4::zmq ${CMAKE_THREAD_LIBS_INIT}) -#target_compile_options(zmq_dc PUBLIC ${CPP11_FLAG}) -#add_executable(zmq_recv zmq_recv.cc ../drv/rfm.c ${CMAKE_CURRENT_SOURCE_DIR}/../include/daq_core.h) -#target_include_directories(zmq_recv PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_recv PUBLIC zmq_dc_recv zmq4::zmq driver::shmem ${CMAKE_THREAD_LIBS_INIT}) -#if (${CXX_HAS_ATOMIC}) -#else (${CXX_HAS_ATOMIC}) -# target_compile_definitions(zmq_recv PUBLIC "NO_STD_ATOMIC=1") -#endif (${CXX_HAS_ATOMIC}) -#target_compile_options(zmq_recv PUBLIC ${CPP11_FLAG}) +configure_file(test_zmq_xmit_recv.sh.in test_zmq_xmit_recv.sh @ONLY) -#add_executable(zmq_proxy zmq_proxy.c) -#target_include_directories(zmq_proxy PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_proxy PUBLIC zmq4::zmq) -#target_compile_options(zmq_proxy PUBLIC -std=gnu99) - -#add_executable(zmq_proxy_client zmq_proxy_client.c) -#target_include_directories(zmq_proxy_client PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_proxy_client PUBLIC zmq4::zmq) -#target_compile_options(zmq_proxy_client PUBLIC -std=gnu99) - -#add_executable(zmq_rcv_from_dc zmq_rcv_from_dc.c) -#target_include_directories(zmq_rcv_from_dc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -#target_link_libraries(zmq_rcv_from_dc PUBLIC zmq4::zmq) -#target_compile_options(zmq_rcv_from_dc PUBLIC -std=gnu99) - -if (${CXX_HAS_ATOMIC}) -# don't even try building these on pre c++11 compilers. -add_executable(zmq_daq_test_data_firehose tests/zmq_firehose_pub.cc tests/zmq_firehose_common.cc) -target_include_directories(zmq_daq_test_data_firehose PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -target_include_directories(zmq_daq_test_data_firehose PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/tests) -target_link_libraries(zmq_daq_test_data_firehose PUBLIC zmq4::zmq) -target_requires_cpp11(zmq_daq_test_data_firehose PUBLIC) -#target_compile_features(zmq_daq_test_data_firehose PUBLIC cxx_defaulted_functions) - -add_executable(zmq_daq_test_data_firehose_recv tests/zmq_firehose_sub.cc tests/zmq_firehose_common.cc) -target_include_directories(zmq_daq_test_data_firehose_recv PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include) -target_include_directories(zmq_daq_test_data_firehose_recv PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/tests) -target_link_libraries(zmq_daq_test_data_firehose_recv PUBLIC zmq4::zmq) -target_requires_cpp11(zmq_daq_test_data_firehose_recv PUBLIC) -#target_compile_features(zmq_daq_test_data_firehose_recv PUBLIC cxx_defaulted_functions) - -endif (${CXX_HAS_ATOMIC}) +add_test(NAME "test_zmq_xmit_recv" + COMMAND /bin/bash ./test_zmq_xmit_recv.sh + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}") +install(TARGETS zmq_recv DESTINATION bin) install(TARGETS zmq_xmit DESTINATION bin) diff --git a/src/zmq_stream/Makefile b/src/zmq_stream/Makefile index d8de21dac76d9c8fb5c3a66950cc75d780510320..d51a4c4656cd8b85cf178bc31a61da3b54083470 100644 --- a/src/zmq_stream/Makefile +++ b/src/zmq_stream/Makefile @@ -15,7 +15,7 @@ TARGETS=zmq_xmit #zmq_multi_stream: zmq_multi_stream.o rfm.o #zmqms: zmqms.o rfm.o -zmq_xmit: zmq_fe.o rfm.o zmq_transport.o dc_utils.o simple_pv.o +zmq_xmit: zmq_xmit.o rfm.o zmq_transport.o dc_utils.o simple_pv.o #gdstp_test: gdstp_test.o rfm.o #zmq_multi_rcvr: zmq_multi_rcvr.o #zmq_threads: zmq_threads.o rfm.o zmq_transport.o dc_utils.o @@ -38,7 +38,7 @@ LDFLAGS = -L/usr/local/lib -lzmq -lpthread .PHONY: clean clean: - rm -r *.o zmq_multi_stream zmq_multi_rcvr zmqms zmq_fe zmq_xmit + rm -r *.o zmq_multi_stream zmq_multi_rcvr zmqms zmq_xmit .PHONY: distclean distclean: clean diff --git a/src/zmq_stream/test_zmq_xmit_recv.sh.in b/src/zmq_stream/test_zmq_xmit_recv.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..0af7fdb977555620f63d386529329b8773f340e1 --- /dev/null +++ b/src/zmq_stream/test_zmq_xmit_recv.sh.in @@ -0,0 +1,124 @@ +#!/bin/bash + +CWD="@CMAKE_CURRENT_BINARY_DIR@" + +TDIR="" +PID_STREAM0=0 +PID_STREAM1=0 +PID_STREAM2=0 +PID_LOCAL_DC=0 +PID_ZMQ_XMIT=0 +PID_ZMQ_RECV=0 + +function kill_proc { + if [ $1 -gt 0 ]; then + echo "Closing process $1" + kill $1 + fi +} + +function cleanup { + if [ "x$TDIR" != "x" ]; then + if [ -d $TDIR ]; then + rm -rf "$TDIR" + fi + fi + kill_proc $PID_STREAM0 + kill_proc $PID_STREAM1 + kill_proc $PID_STREAM2 + kill_proc $PID_LOCAL_DC + kill_proc $PID_ZMQ_XMIT + kill_proc $PID_ZMQ_RECV +} + +LOCAL_DC="$CWD/../local_dc/local_dc" +if [ ! -x "$LOCAL_DC" ]; then + echo "$LOCAL_DC" + exit 1 +fi + +ZMQ_XMIT="$CWD/zmq_xmit" +if [ ! -x "$ZMQ_XMIT" ]; then + echo "zmq_emit is required" + exit 1 +fi + +ZMQ_RECV="$CWD/zmq_recv" +if [ ! -x "$ZMQ_RECV" ]; then + echo "zmq_recv is required" + exit 1 +fi + +FE_STREAM_CHECK="$CWD/../fe_stream_test/fe_stream_check" +if [ ! -x "$FE_STREAM_CHECK" ]; then + echo "fe_stream_check is required" + exit 1 +fi + +FE_STREAM_TEST="$CWD/../fe_stream_test/fe_stream_test" +if [ ! -x "$FE_STREAM_TEST" ]; then + echo "fe_stream_test is required" + exit 1 +fi + +if [ ! -w /dev/gpstime ]; then + echo "the gpstime module must be loaded, configured, and accessible by this user" + exit 1 +fi + +if [ ! -w /dev/mbuf ]; then + echo "the mbuf module must be loaded, configured, and accessible by this user" + exit 1 +fi + +PYTHON="" +which python > /dev/null +if [ $? -eq 0 ]; then + PYTHON=`which python` +else + which python3 > /dev/null + if [ $? -eq 0 ]; then + PYTHON=`which python3` + else + echo "Cannot find python or python3" + exit 1 + fi +fi + +trap cleanup EXIT + +TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"` +mkdir "$TDIR/ini_files" +mkdir "$TDIR/logs" + +echo "Ini dir = $TDIR/ini_files" + +"$FE_STREAM_TEST" -d 50 -r 1 -s x1iop1 -i "$TDIR/ini_files" > "$TDIR/logs/iop1" & +PID_STREAM0=$! +"$FE_STREAM_TEST" -d 61 -r 3 -s x1mod1 -i "$TDIR/ini_files" > "$TDIR/logs/mod1" & +PID_STREAM1=$! +"$FE_STREAM_TEST" -d 70 -r 3 -s x1mod2 -i "$TDIR/ini_files" > "$TDIR/logs/mod2" & +PID_STREAM2=$! + +echo "Streamer PIDS = $PID_STREAM0 $PID_STREAM1 $PID_STREAM2" + +"$LOCAL_DC" -m 100 -s "x1iop1 x1mod1 x1mod2" -d "$TDIR/ini_files" -b "local_dc" > "$TDIR/logs/local_dc" & +PID_LOCAL_DC=$! + +sleep 1 + +"$ZMQ_XMIT" -b "local_dc" -m 100 -e lo > "$TDIR/logs/zmq_xmit.log" & +PID_ZMQ_XMIT=$! + +"$ZMQ_RECV" -b "zmq_recv" -m 100 -s localhost > "$TDIR/logs/zmq_recv.log" & +PID_ZMQ_RECV=$! + +"$FE_STREAM_CHECK" -m zmq_recv -s 100 -v -c "$TDIR/ini_files/x1iop1.ini" "$TDIR/ini_files/tpchn_x1iop1.par" \ +-c "$TDIR/ini_files/x1mod1.ini" "$TDIR/ini_files/tpchn_x1mod1.par" \ +-c "$TDIR/ini_files/x1mod2.ini" "$TDIR/ini_files/tpchn_x1mod2.par" +RESULT=$? + +exit $? +#echo "Press enter to continue..." +#DUMMY="" +#read DUMMY \ No newline at end of file diff --git a/src/zmq_stream/tests/firehose_structs.hh b/src/zmq_stream/tests/firehose_structs.hh deleted file mode 100644 index bd6914a3f84f515e441861888217020068e3a6a3..0000000000000000000000000000000000000000 --- a/src/zmq_stream/tests/firehose_structs.hh +++ /dev/null @@ -1,22 +0,0 @@ -// -// Created by jonathan.hanks on 7/26/17. -// - -#ifndef DAQD_TRUNK_FIREHOSE_DCU_HH -#define DAQD_TRUNK_FIREHOSE_DCU_HH - -#include <string> - -class FirehoseGenerator { - -}; - -class FirehoseDCU { -public: -private: - int id_; - - -}; - -#endif //DAQD_TRUNK_FIREHOSE_DCU_HH diff --git a/src/zmq_stream/tests/zmq_firehose_common.cc b/src/zmq_stream/tests/zmq_firehose_common.cc deleted file mode 100644 index 76e4b163231310847135f2e3e992313196a1b056..0000000000000000000000000000000000000000 --- a/src/zmq_stream/tests/zmq_firehose_common.cc +++ /dev/null @@ -1,31 +0,0 @@ -// -// Created by jonathan.hanks on 7/27/17. -// - -#include <time.h> -#include <unistd.h> - -extern "C" { - -void get_time(long &gps, long &gps_n) { - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - gps = static_cast<long>(ts.tv_sec); - gps_n = static_cast<long>(ts.tv_nsec); -} - -void wait_for(long gps, long gps_n) { - if (gps == 0) { - long dummy; - get_time(gps, dummy); - } - while (true) { - long cur_gps, cur_gps_n; - get_time(cur_gps, cur_gps_n); - if (gps < cur_gps) break; - if (gps == cur_gps && gps_n <= cur_gps_n) break; - usleep(1); - } -} - -} \ No newline at end of file diff --git a/src/zmq_stream/tests/zmq_firehose_common.hh b/src/zmq_stream/tests/zmq_firehose_common.hh deleted file mode 100644 index cc2d82915718b93121c3ac44582fc9e49094dd2e..0000000000000000000000000000000000000000 --- a/src/zmq_stream/tests/zmq_firehose_common.hh +++ /dev/null @@ -1,19 +0,0 @@ -// -// Created by jonathan.hanks on 7/27/17. -// - -#ifndef DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H -#define DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H - -#ifdef __cplusplus -extern "C" { -#endif - -extern void get_time(long &gps, long &gps_n); -extern void wait_for(long gps, long gps_n); - -#ifdef __cplusplus -} -#endif - -#endif //DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H diff --git a/src/zmq_stream/tests/zmq_firehose_pub.cc b/src/zmq_stream/tests/zmq_firehose_pub.cc deleted file mode 100644 index 45b708d50ea119e816cd89c58e61832bb4bfc1e4..0000000000000000000000000000000000000000 --- a/src/zmq_stream/tests/zmq_firehose_pub.cc +++ /dev/null @@ -1,257 +0,0 @@ -// -// Created by jonathan.hanks on 7/26/17. -// -#include <algorithm> -#include <array> -#include <cmath> -#include <ctime> -//#include <chrono> -#include <iostream> -#include <random> -#include <sstream> -#include <string> -#include <vector> - -#include <zmq.hpp> -#include "../zmq_daq.h" - -#include "firehose_structs.hh" -#include "zmq_firehose_common.hh" - -struct Config { - std::string bind_point; - std::string master_filename; - int dcu_count; - int chans_per_dcu; - int max_generators; - std::mt19937_64::result_type random_seed; - size_t data_size; - int rate_limit; - int recovery_ms; - int threads; - int chunk_size; - bool verbose; - bool extra_verbose; - bool zero_copy; - - Config(): bind_point{"tcp://*:5555"}, - master_filename{"master_file"}, - dcu_count{5}, chans_per_dcu {5}, - max_generators{50}, - random_seed{static_cast<std::mt19937_64::result_type >(std::time(nullptr))}, - data_size{1024*1024*64}, - rate_limit{0}, - recovery_ms{10000}, - threads{1}, - chunk_size{100*1024*1024}, - verbose{false}, - extra_verbose{false}, - zero_copy{false} - {} - Config(const Config& other) = default; - Config(Config&& other) = default; - Config& operator=(const Config& other) = default; - Config& operator=(Config&& other) = default; -}; - -Config parse_args(int argc, char **argv) { - Config cfg{}; - for (int i = 1; i < argc; ++i) { - std::string arg{argv[i]}; - std::string next_arg; - bool has_next = false; - if (i +1 < argc) { - next_arg = argv[i+1]; - has_next = true; - } - if (arg == "--bind") { - if (has_next) { - cfg.bind_point = next_arg; - ++i; - } - } else if (arg == "--size") { - if (has_next) { - std::istringstream is{next_arg}; - is >> cfg.data_size; - ++i; - } - } else if (arg == "--rate-limit") { - if (has_next) { - std::istringstream is{next_arg}; - is >> cfg.rate_limit; - ++i; - } - std::cout << "rate limit = " << cfg.rate_limit << std::endl; - } else if (arg == "--recovery-ms") { - if (has_next) { - std::istringstream is{next_arg}; - is >> cfg.recovery_ms; - ++i; - } - } else if (arg == "--threads") { - if (has_next) { - std::istringstream is{next_arg}; - is >> cfg.threads; - ++i; - } - } else if (arg == "-v" || arg == "--verbose") { - cfg.verbose = true; - } else if (arg == "-vv") { - cfg.verbose = true; - cfg.extra_verbose = true; - } else if (arg == "-z" || arg == "--zero-copy") { - cfg.zero_copy = true; - } else if (arg == "--chunk") { - if (has_next) { - std::istringstream is{next_arg}; - is >> cfg.chunk_size; - ++i; - } - } else { - std::cerr << "Unknown argument " << arg << std::endl; - } - } - return cfg; -}; - -void simple_send_loop(zmq::socket_t& publisher, const Config& config) { - if (config.data_size < 1024) { - std::cerr << "Data size is too small, please try at least 1KB" << std::endl; - return; - } - const int segments = 16; - const int max_gps_n = 1000 * 1000 * 1000; - const int step = max_gps_n/segments; - - long gps=0; - long gps_n=0; - - if (config.verbose) { - std::cout << "Sending " << config.data_size * 16 << " bytes in 1/16s chunks" << std::endl; - } - - // initialize a full seconds worth of buffers - std::array<std::vector<char>, segments> buffers; - { - // just give each segment a different value - char val = 0; - for (auto& entry:buffers) { - entry.resize(config.data_size, val); - ++val; - } - } - - get_time(gps, gps_n); - gps++; - gps_n = 0; - int cur_segment = 0; - - while(true) { - wait_for(gps, gps_n); - - int parts = 0; - size_t total_size = 0; - - if (config.zero_copy) { - zmq::message_t msg(buffers[cur_segment].data(), config.data_size, nullptr, nullptr); - long* tmp = reinterpret_cast<long*>(buffers[cur_segment].data()); - tmp[0] = gps; - tmp[1] = gps_n; - publisher.send(msg); - parts = 1; - total_size = config.data_size; - } else { - size_t cur = 0; - const size_t chunk_size = config.chunk_size; - while (cur < config.data_size) { - size_t end = cur + chunk_size; - bool last_message = false; - if (end >= config.data_size) { - last_message = true; - end = config.data_size; - } - size_t message_size = end - cur; - total_size += message_size; - zmq::message_t msg(message_size); - char *data = reinterpret_cast<char *>(msg.data()); - std::fill(data, data + message_size, static_cast<char>(cur_segment)); - if (cur == 0) { - long *tmp = reinterpret_cast<long *>(data); - tmp[0] = gps; - tmp[1] = gps_n; - } - publisher.send(msg, (last_message ? 0 : ZMQ_SNDMORE)); - if (config.extra_verbose) - std::cerr << "\t" << message_size << "-" << (last_message ? 0 : ZMQ_SNDMORE) << "\n"; - parts++; - - cur = end; - } - } - - ++cur_segment; - if (cur_segment >= segments) { - cur_segment = 0; - } - gps_n += step; - if (gps_n >= max_gps_n) { - ++gps; - gps_n = 0; - } - - { - long cur_gps, cur_gps_n; - get_time(cur_gps, cur_gps_n); - if (cur_gps > gps || (cur_gps == gps && cur_gps_n >= gps_n)) { - std::cerr << "Late wanted " << gps << ":" << gps_n << " got " << cur_gps << ":" << cur_gps_n << std::endl; - } else if (config.verbose) { - std::cout << "Sent " << total_size << " bytes at " << gps << ":" << gps_n << " by " << cur_gps << ":" << cur_gps_n << " in " << parts << " parts\n"; - } - } - } -} - -int main(int argc, char **argv) { - Config cfg = parse_args(argc, argv); - - std::cout << "Seeding with " << cfg.random_seed << std::endl; - std::mt19937_64 rand_generator{cfg.random_seed}; - -// for (int i = 0; i < cfg.dcu_count; ++i) { -// std::cout << "Generating dcu " << i << std::endl; -// -// } - -// std::cout << "Timing a lot of sin calls" << std::endl; -// auto t0 = std::chrono::steady_clock::now(); -// for (auto i = 0; i < 100000; ++i) { -// std::sin(static_cast<double>(1.5)); -// } -// auto t1 = std::chrono::steady_clock::now(); -// std::chrono::duration<double> delta = t1-t0; -// -// std::cout << "Did lots of sin calls in " << delta.count() << " units of time" << std::endl; - - zmq::context_t context(cfg.threads); - zmq::socket_t publisher(context, ZMQ_PUB); - - { - if (cfg.rate_limit > 0) { - std::cout << "Setting rate limit to " << cfg.rate_limit << "Kb/s" << std::endl; - publisher.setsockopt(ZMQ_RATE, &cfg.rate_limit, sizeof(cfg.rate_limit)); - } - std::cout << "Setting recovery_ivl to " << cfg.recovery_ms << std::endl; - publisher.setsockopt(ZMQ_RECOVERY_IVL, &cfg.recovery_ms, sizeof(cfg.recovery_ms)); - int rate = 0; - size_t rate_size = sizeof(rate); - publisher.getsockopt(ZMQ_RATE, &rate, &rate_size); - std::cout << "Rate limit confirmed at " << rate << "Kb/s" << std::endl; - } - if (cfg.verbose) - std::cout << "Binding to " << cfg.bind_point << std::endl; - publisher.bind(cfg.bind_point.c_str()); - if (cfg.verbose) - std::cout << "Starting send loop" << std::endl; - simple_send_loop(publisher, cfg); - return 1; -} diff --git a/src/zmq_stream/tests/zmq_firehose_sub.cc b/src/zmq_stream/tests/zmq_firehose_sub.cc deleted file mode 100644 index d8566c1d18ed4e021bf19eba631fef37f87a6380..0000000000000000000000000000000000000000 --- a/src/zmq_stream/tests/zmq_firehose_sub.cc +++ /dev/null @@ -1,156 +0,0 @@ -// -// Created by jonathan.hanks on 7/26/17. -// -#include <array> -#include <cmath> -#include <iostream> -#include <random> -#include <sstream> -#include <string> -#include <vector> - -#include <zmq.hpp> -#include "../zmq_daq.h" - -#include "firehose_structs.hh" -#include "zmq_firehose_common.hh" - -struct Config { - std::string bind_point; - int rate_limit; - int threads; - bool verbose; - bool extra_verbose; - - Config(): bind_point{"tcp://127.0.0.1:5555"}, - rate_limit{0}, - threads{1}, - verbose{false}, - extra_verbose{false} - {} - Config(const Config& other) = default; - Config(Config&& other) = default; - Config& operator=(const Config& other) = default; - Config& operator=(Config&& other) = default; -}; - -Config parse_args(int argc, char **argv) { - Config cfg{}; - for (int i = 1; i < argc; ++i) { - std::string arg{argv[i]}; - std::string next_arg; - bool has_next = false; - if (i +1 < argc) { - next_arg = argv[i+1]; - has_next = true; - } - if (arg == "--connect") { - if (has_next) - cfg.bind_point = next_arg; - } - if (arg == "--threads") { - if (has_next) { - std::istringstream is{next_arg}; - is >> cfg.threads; - ++i; - } - } - if (arg == "-v" || arg == "--verbose") { - cfg.verbose = true; - } - if (arg == "-vv") { - cfg.verbose = true; - cfg.extra_verbose = true; - } - if (arg == "--rate-limit") { - if (has_next) { - std::istringstream is{next_arg}; - is >> cfg.rate_limit; - ++i; - } - } - } - return cfg; -}; - -void simple_recv_loop(zmq::socket_t& subscriber, const Config& config) { - - const int segments = 16; - const int max_gps_n = 1000 * 1000 * 1000; - const int step = max_gps_n/segments; - - long expected_gps=0; - long expected_gps_n=0; - - while(true) { - zmq::message_t msg(0); - size_t total_size = 0; - - long gps = 0, gps_n = 0; - bool error = false; - bool first = true; - - int parts = 0; - int more_parts = 0; - size_t opt_len = sizeof(more_parts); - do { - subscriber.recv(&msg); - ++parts; - - if (first) { - long *tmp = reinterpret_cast<long *>(msg.data()); - gps = tmp[0]; - gps_n = tmp[1]; - first = false; - } - - total_size += msg.size(); - opt_len = sizeof(more_parts); - subscriber.getsockopt(ZMQ_RCVMORE, &more_parts, &opt_len); - if (config.extra_verbose) - std::cerr << "\t" << msg.size() << "-" << more_parts << "\n"; - } while (more_parts != 0); - - if (expected_gps != 0) { - if (expected_gps != gps || expected_gps_n != gps_n) { - std::cerr << "ERROR: expecting " << expected_gps << ":" << expected_gps_n << " got " << gps << ":" << gps_n << " instead." << std::endl; - } - } - expected_gps = gps; - expected_gps_n = gps_n + step; - if (expected_gps_n >= max_gps_n) { - ++expected_gps; - expected_gps_n = 0; - } - - if (config.verbose) { - std::cout << "Recieved " << total_size << " bytes for " << gps << ":" << gps_n << " in " << parts << " parts\n"; - } - } -} - -int main(int argc, char **argv) { - Config cfg = parse_args(argc, argv); - - zmq::context_t context(cfg.threads); - zmq::socket_t subscriber(context, ZMQ_SUB); - - { - if (cfg.rate_limit > 0) { - std::cout << "Setting rate limit to " << cfg.rate_limit << "Kb/s" << std::endl; - subscriber.setsockopt(ZMQ_RATE, &cfg.rate_limit, sizeof(cfg.rate_limit)); - } - subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); - int rate = 0; - size_t rate_size = sizeof(rate); - subscriber.getsockopt(ZMQ_RATE, &rate, &rate_size); - std::cout << "Rate limit confirmed at " << rate << "Kb/s" << std::endl; - } - - std::cout << "connecting to " << cfg.bind_point << std::endl; - subscriber.connect(cfg.bind_point.c_str()); - - std::cout << "Beginning receive loop " << std::endl; - simple_recv_loop(subscriber, cfg); - return 1; -} diff --git a/src/zmq_stream/zmq_fe.c b/src/zmq_stream/zmq_fe.c deleted file mode 100644 index 9dd3017c8b2fd4eb02f39737267bb46537fad442..0000000000000000000000000000000000000000 --- a/src/zmq_stream/zmq_fe.c +++ /dev/null @@ -1,744 +0,0 @@ -// -///// @file fe_dc.c -///// @brief Front End data concentrator -//// -// -#define _GNU_SOURCE -#define _XOPEN_SOURCE 700 - -#include <ctype.h> -#include <stdio.h> -#include <string.h> -#include <stdlib.h> -#include <malloc.h> -#include <signal.h> -#include <unistd.h> -#include <sys/ioctl.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <stdlib.h> - -#include "../drv/crc.c" -#include "../include/daqmap.h" -#include "../include/drv/fb.h" -// #include "../include/daq_core.h" -#include "../drv/gpstime/gpstime.h" -#include <zmq.h> -#include <assert.h> -#include "zmq_daq.h" -#include "zmq_transport.h" -#include "dc_utils.h" -#include "simple_pv.h" - - -#define MAX_NSYS 24 - -#define __CDECL - -static struct rmIpcStr *shmIpcPtr[128]; -static char *shmDataPtr[128]; -static struct cdsDaqNetGdsTpNum *shmTpTable[128]; -static const int header_size = sizeof(struct daq_multi_dcu_header_t); -static const int buf_size = DAQ_DCU_BLOCK_SIZE * 2; -int modelrates[DAQ_TRANSIT_MAX_DCU]; -int dcuid[DAQ_TRANSIT_MAX_DCU]; -daq_multi_dcu_data_t *ixDataBlock; -daq_multi_cycle_header_t *ifo_header; -char *zbuffer; - -extern void *findSharedMemory(char *); -extern void *findSharedMemorySize(char *,int); - -char modelnames[DAQ_TRANSIT_MAX_DCU][64]; -char *sysname; -int do_verbose = 0; -// int sendLength = 0; -static volatile int keepRunning = 1; -char *ifo; -char *ifo_data; -size_t cycle_data_size; - -// ZMQ defines -//char msg_buffer[0x200000]; -daq_multi_dcu_data_t msg_buffer; -void *daq_context; -void *daq_publisher; - - -int symmetricom_fd = -1; -unsigned long window_start[16] = {63500,135000,197500,260000,322500,395000,447500,510000,572500,635000,697500,760000,822500,885000,947500,10000}; -unsigned long window_stop[16] = {135000,197500,260000,322500,395000,447500,510000,572500,635000,697500,760000,822500,885000,947500,999990,63500}; - - -/*********************************************************************************/ -/* U S A G E */ -/* */ -/*********************************************************************************/ - -void Usage() -{ - printf("Usage of zmq_fe:\n"); - printf("zmq_fe -s <models> <OPTIONS>\n"); - printf(" -b <buffer> : Name of the mbuf to concentrate the data to locally (defaults to ifo)\n"); - printf(" -s <value> : Name of FE control models\n"); - printf(" -m <value> : Local memory buffer size in megabytes\n"); - printf(" -v 1 : Enable verbose output\n"); - printf(" -e <interface> : Name of the interface to broadcast data through\n"); - printf(" -d <directory> : Path to the gds tp dir used to lookup model rates\n"); - printf(" -D <value> : Add a delay in ms to sending the data. Used to spread the load\n"); - printf(" : when working with multiple sending systems. Defaults to 0."); - printf(" -h : This helpscreen\n"); - printf("\n"); -} - -void zmq_make_connection(char *eport) -{ - char loc[200]; - int rc; - - // Set up the data publisher socket - daq_context = zmq_ctx_new(); - daq_publisher = zmq_socket (daq_context,ZMQ_PUB); - // sprintf(loc,"%s%d","tcp://*:",DAQ_DATA_PORT); - //sprintf(loc,"%s%s%s%d","tcp://",eport,":",DAQ_DATA_PORT); - if (!dc_generate_connection_string(loc, eport, sizeof(loc), 0)) { - fprintf(stderr, "Unable to generate connection string for '%s'\n", eport); - exit(1); - } - dc_set_zmq_options(daq_publisher); - printf("Binding to '%s'\n", loc); - rc = zmq_bind (daq_publisher, loc); - if (rc < 0) { - fprintf(stderr, "Errno = %d: %s\n", errno, strerror(errno)); - - } - assert (rc == 0); - printf("sending data on %s\n",loc); -} - -// ********************************************************************************************** -/// Get current GPS time from the symmetricom IRIG-B card -unsigned long -symm_gps_time(unsigned long *frac, int *stt) { - unsigned long t[3]; - - ioctl (symmetricom_fd, IOCTL_SYMMETRICOM_TIME, &t); - t[1] *= 1000; - t[1] += t[2]; - if (frac) *frac = t[1]; - if (stt) *stt = 0; - return t[0]; -} - -// ********************************************************************************************** -/// See if the GPS card is locked. -int -symm_ok() { - unsigned long req = 0; - ioctl (symmetricom_fd, IOCTL_SYMMETRICOM_STATUS, &req); - printf("Symmetricom status: %s\n", req? "LOCKED": "UNCLOCKED"); - return req; -} - -// ********************************************************************************************** -// This function uses time from IRIG-B card -// Abandoned due to interference with IOP model reading same card -int -waitNextCycle( unsigned int cyclereq, // Cycle to wait for - int reset, // Request to reset model ipc shared memory - struct rmIpcStr *ipcPtr) // Pointer to IOP IPC shared memory -{ - unsigned long gps_frac; - int gps_stt; - unsigned long gps_time; - int iopRunning = 0; - int runontimer = 0; - - // if reset, want to set IOP cycle to impossible number - if(reset) ipcPtr->cycle = 50; - // Find cycle - do { - usleep(1000); - // Get GPS time from gpstime driver - gps_time = symm_gps_time(&gps_frac, &gps_stt); - // Convert nanoseconds to microseconds - gps_frac /= 1000; - // If requested cycle matches IOP shared memory cycle, - // then IOP is running and requested cycle is found - if(ipcPtr->cycle == cyclereq) iopRunning = 1; - // If GPS time is within window that requested cycle should be ready, - // then return on timer event. - if(gps_frac > window_start[cyclereq] && gps_frac < window_stop[cyclereq]) runontimer = 1; - }while(iopRunning == 0 && runontimer == 0 && keepRunning); - // Return iopRunning: - // - One (1) if synced to IOP - // - Zero (0) if synced by GPS time ie IOP not running. - return(iopRunning); -} -// ********************************************************************************************** -int -waitNextCycle2( int nsys, - unsigned int cyclereq, // Cycle to wait for - int reset, // Request to reset model ipc shared memory - int dataRdy[], - struct rmIpcStr *ipcPtr[]) // Pointer to IOP IPC shared memory -{ - int iopRunning = 0; - int ii; - int threads_rdy = 0; - int timeout = 0; - unsigned int expected_time_sec = 0; - unsigned int local_model_time_sec = 0; - - // if reset, want to set IOP cycle to impossible number - if(reset) ipcPtr[0]->cycle = 50; - // Find cycle - // do { - usleep(1000); - // Wait until received data from at least 1 FE or timeout - do { - usleep(2000); - if(ipcPtr[0]->cycle == cyclereq) - { - iopRunning = 1; - dataRdy[0] = 1; - } - timeout += 1; - } while(!iopRunning && timeout < 500); - - expected_time_sec = ipcPtr[0]->bp[cyclereq].timeSec; - - // Wait until data received from everyone or timeout - timeout = 0; - do { - usleep(100); - for(ii=1;ii<nsys;ii++) { - local_model_time_sec = ipcPtr[ii]->bp[cyclereq].timeSec; - if(ipcPtr[ii]->cycle == cyclereq && local_model_time_sec == expected_time_sec && !dataRdy[ii]) threads_rdy ++; - if(ipcPtr[ii]->cycle == cyclereq && local_model_time_sec == expected_time_sec) dataRdy[ii] = 1; - } - timeout += 1; - }while(threads_rdy < nsys && timeout < 20); - - // }while(iopRunning == 0 && keepRunning); - // Return iopRunning: - // - One (1) if synced to IOP - // - Zero (0) if synced by GPS time ie IOP not running. - return(iopRunning); -} - -// ********************************************************************************************** -// Capture SIGHALT from ControlC -void intHandler(int dummy) { - keepRunning = 0; -} - -// ********************************************************************************************** -void print_diags(int nsys, int lastCycle, int sendLength, daq_multi_dcu_data_t *ixDataBlock) { - int ii = 0; - unsigned long sym_gps_sec = 0; - unsigned long sym_gps_nsec = 0; - - sym_gps_sec = symm_gps_time(&sym_gps_nsec, 0); - // Print diags in verbose mode - printf("\nTime = %d-%d size = %d\n",shmIpcPtr[0]->bp[lastCycle].timeSec,shmIpcPtr[0]->bp[lastCycle].timeNSec,sendLength); - printf("Sym gps = %d-%d (time received)\n", (int)sym_gps_sec, (int)sym_gps_nsec); - printf("\tCycle = "); - for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock->header.dcuheader[ii].cycle); - printf("\n\tTimeSec = "); - for(ii=0;ii<nsys;ii++) printf("\t%d",ixDataBlock->header.dcuheader[ii].timeSec); - printf("\n\tTimeNSec = "); - for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock->header.dcuheader[ii].timeNSec); - printf("\n\tDataSize = "); - for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock->header.dcuheader[ii].dataBlockSize); - printf("\n\tTPCount = "); - for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock->header.dcuheader[ii].tpCount); - printf("\n\tTPSize = "); - for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock->header.dcuheader[ii].tpBlockSize); - printf("\n\tXmitSize = "); - for(ii=0;ii<nsys;ii++) printf("\t\t%d",shmIpcPtr[ii]->dataBlockSize); - printf("\n\n "); -} - -// ********************************************************************************************** -// Get control model loop rates from GDS param files -// Needed to properly size TP data into the data stream -int getmodelrate( int *rate, int *dcuid, char *modelname, char *gds_tp_dir) { - char gdsfile[128]; - int ii = 0; - FILE *f = 0; - char *token = 0; - char *search = "="; - char line[80]; - char *s = 0; - char *s1 = 0; - - if (gds_tp_dir) { - sprintf(gdsfile, "%s/tpchn_%s.par", gds_tp_dir, modelname); - } else { - /// Need to get IFO and SITE info from environment variables. - s = getenv("IFO"); - for (ii = 0; s[ii] != '\0'; ii++) { - if (isupper(s[ii])) s[ii] = (char) tolower(s[ii]); - } - s1 = getenv("SITE"); - for (ii = 0; s1[ii] != '\0'; ii++) { - if (isupper(s1[ii])) s1[ii] = (char) tolower(s1[ii]); - } - sprintf(gdsfile, "/opt/rtcds/%s/%s/target/gds/param/tpchn_%s.par", s1, s, modelname); - } - f = fopen(gdsfile, "rt"); - if (!f) return 0; - while(fgets(line,80,f) != NULL) { - line[strcspn(line, "\n")] = 0; - if (strstr(line, "datarate") != NULL) { - token = strtok(line, search); - token = strtok(NULL, search); - if (!token) continue; - while (*token && *token == ' ') { - ++token; - } - *rate = atoi(token); - break; - } - } - fclose(f); - f = fopen(gdsfile, "rt"); - if (!f) return 0; - while(fgets(line,80,f) != NULL) { - line[strcspn(line, "\n")] = 0; - if (strstr(line, "rmid") != NULL) { - token = strtok(line, search); - token = strtok(NULL, search); - if (!token) continue; - while (*token && *token == ' ') { - ++token; - } - *dcuid = atoi(token); - break; - } - } - fclose(f); - - return 0; -} - - -// ********************************************************************************************** -int loadMessageBuffer( int nsys, - int lastCycle, - int status, - int dataRdy[], - int *nsys_ready, - int *nsys_ready_mask - ) -{ - int sendLength = 0; - int ii; - int daqStatBit[2]; - daqStatBit[0] = 1; - daqStatBit[1] = 2; - int dataXferSize; - char *dataBuff; - int myCrc = 0; - int crcLength = 0; - int cur_mask = 1; - - *nsys_ready = 0; - *nsys_ready_mask = 0; - // Set pointer to 0MQ message data block - zbuffer = (char *)&ixDataBlock->dataBlock[0]; - // Initialize data send length to size of message header - sendLength = header_size; - // Set number of FE models that have data in this message - ixDataBlock->header.fullDataBlockSize = 0; - int db = 0; - // Loop thru all FE models - for (ii=0;ii<nsys;ii++) { - if(dataRdy[ii]) { - ++(*nsys_ready); - *nsys_ready_mask = (*nsys_ready_mask) | cur_mask; - // Set heartbeat monitor for return to DAQ software - if (lastCycle == 0) shmIpcPtr[ii]->reqAck ^= daqStatBit[0]; - // Set DCU ID in header - ixDataBlock->header.dcuheader[db].dcuId = shmIpcPtr[ii]->dcuId; - // Set DAQ .ini file CRC checksum - ixDataBlock->header.dcuheader[db].fileCrc = shmIpcPtr[ii]->crc; - // Set 1/16Hz cycle number - ixDataBlock->header.dcuheader[db].cycle = shmIpcPtr[ii]->cycle; - // Set GPS seconds - ixDataBlock->header.dcuheader[db].timeSec = shmIpcPtr[ii]->bp[lastCycle].timeSec; - // Set GPS nanoseconds - ixDataBlock->header.dcuheader[db].timeNSec = shmIpcPtr[ii]->bp[lastCycle].timeNSec; - crcLength = shmIpcPtr[ii]->bp[lastCycle].crc; - // Set Status -- as running - ixDataBlock->header.dcuheader[ii].status = 2; - // Indicate size of data block - // ********ixDataBlock->header.dcuheader[db].dataBlockSize = shmIpcPtr[ii]->dataBlockSize; - ixDataBlock->header.dcuheader[db].dataBlockSize = crcLength; - // Prevent going beyond MAX allowed data size - if (ixDataBlock->header.dcuheader[db].dataBlockSize > DAQ_DCU_BLOCK_SIZE) - ixDataBlock->header.dcuheader[db].dataBlockSize = DAQ_DCU_BLOCK_SIZE; - // Calculate TP data size - ixDataBlock->header.dcuheader[db].tpCount = (unsigned int)shmTpTable[ii]->count; - ixDataBlock->header.dcuheader[db].tpBlockSize = sizeof(float) * modelrates[ii] * ixDataBlock->header.dcuheader[ii].tpCount / DAQ_NUM_DATA_BLOCKS_PER_SECOND; - - // Copy GDSTP table to xmission buffer header - memcpy(&(ixDataBlock->header.dcuheader[db].tpNum[0]), - &(shmTpTable[ii]->tpNum[0]), - sizeof(int)*ixDataBlock->header.dcuheader[db].tpCount); - - // Set pointer to dcu data in shared memory - dataBuff = (char *)(shmDataPtr[ii] + lastCycle * buf_size); - // Copy data from shared memory into local buffer - dataXferSize = ixDataBlock->header.dcuheader[db].dataBlockSize + ixDataBlock->header.dcuheader[db].tpBlockSize; - //dataXferSize = shmIpcPtr[ii]->dataBlockSize; - memcpy((void *)zbuffer, dataBuff, dataXferSize); - - // Calculate CRC on the data and add to header info - myCrc = 0; - myCrc = crc_ptr((char *)zbuffer, crcLength, 0); - myCrc = crc_len(crcLength, myCrc); - ixDataBlock->header.dcuheader[db].dataCrc = myCrc; - - // Increment the 0mq data buffer pointer for next FE - zbuffer += dataXferSize; - // Increment the 0mq message size with size of FE data block - sendLength += dataXferSize; - // Increment the data block size for the message, this includes regular data + TP data - ixDataBlock->header.fullDataBlockSize += dataXferSize; - - // Update heartbeat monitor to DAQ code - if (lastCycle == 0) shmIpcPtr[ii]->reqAck ^= daqStatBit[1]; - db ++; - } - cur_mask <<= 1; - } - ixDataBlock->header.dcuTotalModels = db; - return sendLength; -} - -// ********************************************************************************************** -int send_to_local_memory(int nsys, int xmitData, int send_delay_ms, int pv_debug_pipe, char *pv_prefix) -{ - int do_wait = 1; - int daqStatBit[2]; - daqStatBit[0] = 1; - daqStatBit[1] = 2; - char *nextData; - - - int ii; - int lastCycle = 0; - unsigned int nextCycle = 0; - - int sync2iop = 1; - int status = 0; - int dataRdy[MAX_NSYS]; - //int msg_size; - int sendLength = 0; - int time_delta = 0; - int expected_nsys = nsys; - int actual_nsys = 0; - int actual_nsys_mask = 0; - - - SimplePV pvs[] = { - { - "FE_EXPECTED_DCUS", - SIMPLE_PV_INT, - (void*)&expected_nsys, - nsys + 2, - 0, - nsys + 1, - nsys-1, - }, - { - "FE_ACTUAL_DCUS", - SIMPLE_PV_INT, - (void*)&actual_nsys, - nsys + 2, - 0, - nsys + 1, - nsys-1, - }, - { - "FE_DATA_SIZE", - SIMPLE_PV_INT, - (void *)&sendLength, - 4*1024*1024, - 0, - 4*1024*1024, - 0, - }, - { - "FE_TIME_DELTA_MS", - SIMPLE_PV_INT, - (void *)&time_delta, - 62, - 0, - 60, - 1, - }, - { - "FE_SYS_READY_MASK", - SIMPLE_PV_INT, - (void*)&actual_nsys_mask, - 0x7fffffff, - 0, - 0x7fffffff, - 0, - }, - }; - - - for(ii=0;ii<MAX_NSYS;ii++) dataRdy[ii] = 0; - - - //long cur_gps=0, cur_nano=0; - // seg_gps=0, seg_nano=0; - int dummy = 0; - - do { - - for(ii=0;ii<nsys;ii++) dataRdy[ii] = 0; - - status = waitNextCycle2(nsys,nextCycle,sync2iop,dataRdy,shmIpcPtr); - // status = waitNextCycle(nextCycle,sync2iop,shmIpcPtr[0]); - if(!status) { - keepRunning = 0;; - return(0); - } - else sync2iop = 0; - - // IOP will be first model ready - // Need to wait for 2K models to reach end of their cycled - usleep((do_wait * 1000)); - - - nextData = (char *)ifo_data; - nextData += cycle_data_size * nextCycle; - ixDataBlock = (daq_multi_dcu_data_t *)nextData; - actual_nsys = 0; - actual_nsys_mask = 0; - sendLength = loadMessageBuffer(nsys, nextCycle, status, dataRdy, &actual_nsys, &actual_nsys_mask); - // Print diags in verbose mode - if(nextCycle == 0 && do_verbose) print_diags(nsys,lastCycle,sendLength,ixDataBlock); - // Write header info - ifo_header->curCycle = nextCycle; - ifo_header->cycleDataSize = cycle_data_size; - ifo_header->maxCycle = DAQ_NUM_DATA_BLOCKS_PER_SECOND; - - if(xmitData) { - // Copy data to 0mq message buffer - memcpy((void*)&msg_buffer,nextData,sendLength); - // Send Data - //msg_size = zmq_send(daq_publisher,(void*)&msg_buffer,sendLength,0); - usleep(send_delay_ms * 1000); - zmq_send_daq_multi_dcu_t(&msg_buffer, daq_publisher, 0); - // printf("Sending data size = %d\n",msg_size); - } - //seg_gps = ixDataBlock->header.dcuheader[0].timeSec; - //seg_nano = ixDataBlock->header.dcuheader[0].timeNSec * 62500000; - //cur_gps = (long)symm_gps_time((unsigned long*)&cur_nano, &dummy); - - - //time_delta = (cur_gps - seg_gps)*1000 + (cur_nano - seg_nano)/1000000; - /* fprintf(stderr, "cgps %d:%d seg_gps %d:%d delta_ms: %d\n", cur_gps, cur_nano, seg_gps, seg_nano, time_delta); */ - - send_pv_update(pv_debug_pipe, pv_prefix, pvs, sizeof(pvs)/sizeof(pvs[0])); - - nextCycle = (nextCycle + 1) % 16; - - } while (keepRunning); /* do this until sighalt */ - - printf("\n***********************************************************\n\n"); - - - return 0; -} - - -/*********************************************************************************/ -/* M A I N */ -/* */ -/*********************************************************************************/ - -int __CDECL -main(int argc,char *argv[]) -{ - int counter = 0; - int nsys = 1; - int dcuId[MAX_NSYS]; - int ii = 0; - char *gds_tp_dir = 0; - int max_data_size_mb = 64; - int max_data_size = 0; - int error = 0; - int status = -1; - unsigned long gps_frac = 0; - int gps_stt = 0; - int gps_ok = 0; - unsigned long gps_time = 0; - char *eport = 0; - int sendViaZmq = 0; - char *buffer_name = "ifo"; - int send_delay_ms = 0; - char *pv_prefix = 0; - char *pv_debug_pipe_name = 0; - int pv_debug_pipe = -1; - - ii = 0; - - if (argc<3) { - Usage(); - return(-1); - } - - /* Get the parameters */ - while ((counter = getopt(argc, argv, "b:e:m:l:h:v:s:d:D:p:P:")) != EOF) - switch(counter) { - case 'b': - buffer_name = optarg; - break; - - case 'm': - max_data_size_mb = atoi(optarg); - if (max_data_size_mb < 20){ - return -1; - } - if (max_data_size_mb > 100){ - return -1; - } - break; - - case 's': - sysname = optarg; - continue; - case 'v': - do_verbose = atoi(optarg); - break; - case 'e': - eport = optarg; - sendViaZmq = 1; - break; - case 'd': - gds_tp_dir = optarg; - break; - case 'l': - if (0 == freopen(optarg, "w", stdout)) { - perror ("freopen"); - exit (1); - } - setvbuf(stdout, NULL, _IOLBF, 0); - stderr = stdout; - break; - case 'h': - Usage(); - return(0); - case 'D': - send_delay_ms = atoi(optarg); - break; - case 'p': - pv_prefix = optarg; - break; - case 'P': - pv_debug_pipe_name = optarg; - break; - } - if (pv_debug_pipe_name) - { - pv_debug_pipe = open(pv_debug_pipe_name, O_NONBLOCK | O_RDWR, 0); - if (pv_debug_pipe < 0) - { - fprintf(stderr, "Unable to open %s for writing (pv status)\n", pv_debug_pipe_name); - exit(1); - } - } - - max_data_size = max_data_size_mb * 1024*1024; - - if (sendViaZmq) printf("Writing DAQ data to local shared memory and sending out on ZMQ\n"); - else printf("Writing DAQ data to local shared memory only \n"); - if(sysname != NULL) { - printf("System names: %s\n", sysname); - sprintf(modelnames[0],"%s",strtok(sysname, " ")); - for(;;) { - char *s = strtok(0, " "); - if (!s) break; - if (nsys == (MAX_NSYS - 1)) - { - fprintf(stderr, "Too many system names passed, max is %d\n", MAX_NSYS); - exit(1); - } - sprintf(modelnames[nsys],"%s",s); - dcuId[nsys] = 0; - nsys++; - } - } else { - Usage(); - return(0); - } - - if(sendViaZmq) zmq_make_connection(eport); - // Open file descriptor for the gpstime driver - symmetricom_fd = open ("/dev/gpstime", O_RDWR | O_SYNC); - if (symmetricom_fd < 0) { - perror("/dev/gpstime"); - exit(1); - } - - gps_ok = symm_ok(); - gps_time = symm_gps_time(&gps_frac, &gps_stt); - printf("GPS TIME = %ld\tfrac = %ld\tstt = %d\n",gps_time,gps_frac,gps_stt); - - // Parse the model names from the command line entry - for(ii=0;ii<nsys;ii++) { - char shmem_fname[128]; - sprintf(shmem_fname, "%s_daq", modelnames[ii]); - void *dcu_addr = findSharedMemory(shmem_fname); - if (dcu_addr == NULL) { - fprintf(stderr, "Can't map shmem\n"); - exit(-1); - } else { - printf(" %s mapped at 0x%lx\n",modelnames[ii],(unsigned long)dcu_addr); - } - shmIpcPtr[ii] = (struct rmIpcStr *)((char *)dcu_addr + CDS_DAQ_NET_IPC_OFFSET); - shmDataPtr[ii] = ((char *)dcu_addr + CDS_DAQ_NET_DATA_OFFSET); - shmTpTable[ii] = (struct cdsDaqNetGdsTpNum *)((char *)dcu_addr + CDS_DAQ_NET_GDS_TP_TABLE_OFFSET); - } - // Get model rates to get GDS TP data sizes. - for (ii = 0; ii < nsys; ii++) { - status = getmodelrate(&modelrates[ii],&dcuid[ii],modelnames[ii], gds_tp_dir); - printf("Model %s rate = %d dcuid = %d\n",modelnames[ii],modelrates[ii],dcuid[ii]); - if (status != 0) { - fprintf(stderr, "Unable to determine the rate of %s\n", modelnames[ii]); - exit(1); - } - } - - // Get pointers to local DAQ mbuf - ifo = (char *)findSharedMemorySize(buffer_name, max_data_size_mb); - ifo_header = (daq_multi_cycle_header_t *)ifo; - ifo_data = (char *)ifo + sizeof(daq_multi_cycle_header_t); - cycle_data_size = (max_data_size - sizeof(daq_multi_cycle_header_t))/DAQ_NUM_DATA_BLOCKS_PER_SECOND; - cycle_data_size -= (cycle_data_size % 8); - - signal(SIGINT,intHandler); - sleep(1); - - - - // Enter infinite loop of reading control model data and writing to local shared memory - do { - error = send_to_local_memory(nsys, sendViaZmq, send_delay_ms, pv_debug_pipe, pv_prefix); - } while (error == 0 && keepRunning == 1); - if(sendViaZmq) { - printf("Closing out ZMQ and exiting\n"); - zmq_close(daq_publisher); - zmq_ctx_destroy(daq_context); - } - - return 0; -} diff --git a/src/zmq_stream/zmq_xmit.c b/src/zmq_stream/zmq_xmit.c new file mode 100644 index 0000000000000000000000000000000000000000..2c57c1e683ed75066baaa6d326c554002759f7b3 --- /dev/null +++ b/src/zmq_stream/zmq_xmit.c @@ -0,0 +1,310 @@ +// +///// @file fe_dc.c +///// @brief Front End data concentrator +//// +// +#define _GNU_SOURCE +#define _XOPEN_SOURCE 700 + +#include <ctype.h> +#include <fcntl.h> +#include <malloc.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include "../drv/crc.c" +#include "../include/daqmap.h" +#include "../include/drv/fb.h" +#include "../include/daq_core.h" +#include "../drv/gpstime/gpstime.h" +#include "dc_utils.h" +#include "zmq_transport.h" +#include <assert.h> +#include <zmq.h> + +#define MAX_NSYS 24 + +#define __CDECL + +static const int buf_size = DAQ_DCU_BLOCK_SIZE * 2; +daq_multi_cycle_header_t* ifo_header; + +extern void* findSharedMemory( char* ); +extern void* findSharedMemorySize( char*, int ); + +int do_verbose = 0; +// int sendLength = 0; +static volatile int keepRunning = 1; +char* ifo; +char* ifo_data; +size_t cycle_data_size; + +// ZMQ defines +// char msg_buffer[0x200000]; +void* daq_context; +void* daq_publisher; + +/*********************************************************************************/ +/* U S A G E */ +/* */ +/*********************************************************************************/ + +void +Usage( ) +{ + printf( "Usage of zmq_xmit:\n" ); + printf( "zmq_xmit -s <models> <OPTIONS>\n" ); + printf( + " -b <buffer> : Name of the mbuf to read data from [local_dc]\n" ); + printf( " -m <value> : Local memory buffer size in megabytes\n" ); + printf( " -v 1 : Enable verbose output\n" ); + printf( + " -e <interface> : Name of the interface to broadcast data through\n" ); + printf( " -D <value> : Add a delay in ms to sending the data. Used to " + "spread the load\n" ); + printf( " : when working with multiple sending systems. " + "Defaults to 0." ); + printf( " -h : This helpscreen\n" ); + printf( "\n" ); +} + +void +zmq_make_connection( char* eport ) +{ + char loc[ 200 ]; + int rc; + + // Set up the data publisher socket + daq_context = zmq_ctx_new( ); + daq_publisher = zmq_socket( daq_context, ZMQ_PUB ); + // sprintf(loc,"%s%d","tcp://*:",DAQ_DATA_PORT); + // sprintf(loc,"%s%s%s%d","tcp://",eport,":",DAQ_DATA_PORT); + if ( !dc_generate_connection_string( loc, eport, sizeof( loc ), 0 ) ) + { + fprintf( + stderr, "Unable to generate connection string for '%s'\n", eport ); + exit( 1 ); + } + dc_set_zmq_options( daq_publisher ); + printf( "Binding to '%s'\n", loc ); + rc = zmq_bind( daq_publisher, loc ); + if ( rc < 0 ) + { + fprintf( stderr, "Errno = %d: %s\n", errno, strerror( errno ) ); + } + assert( rc == 0 ); + printf( "sending data on %s\n", loc ); +} + +// ********************************************************************************************** + +/** + * @brief Set the cycle counter to an invalid value. + * @param header pointer to the input block header + * @note Used to force a resync of the counter. + */ +void +reset_cycle_counter( volatile daq_multi_cycle_header_t* header ) +{ + header->curCycle = 0x50505050; +} + +/** + * @brief wait until the data in the input shared mem buffer has the + * @param header pointer to the input block header + * requested cycle counter. + * + * @returns non-zero if it we timeout + */ +int +wait_for_cycle( volatile daq_multi_cycle_header_t* header, + unsigned int requested_cycle ) +{ + int timeout = 0; + + do + { + usleep( 2000 ); + if ( header->curCycle == requested_cycle ) + { + return 0; + } + ++timeout; + } while ( timeout < 500 ); + return 1; +} + +/** + * @brief Check to see if the requested input data is in the input buffer + * @param header pointer to the input block header + * @param the cycle you are checking data for + * @param max_data_size the max size of the input buffer + * @return 0 if safe, != 0 if overflow + */ +int +data_will_overflow( volatile daq_multi_cycle_header_t* header, + int cycle, + int max_data_size ) +{ + return ( ( ( cycle + 1 ) * header->cycleDataSize ) > max_data_size ); +} + +/** + * @brief Return a pointer to the daq_multi_dcu_data_t* for a given cycle + * @param header The main input buffer + * @param cycle The cycle to retrieve data for + * @returns A pointer to the cycles data + * @note does not do bounds checking + */ +volatile daq_multi_dcu_data_t* +get_cycle_data( volatile daq_multi_cycle_header_t* header, int cycle ) +{ + int offset = cycle * header->cycleDataSize; + volatile char* data_block = + &( ( (volatile daq_multi_cycle_data_t*)header )->dataBlock[ 0 ] ); + return (volatile daq_multi_dcu_data_t*)( data_block + offset ); +} + +// ********************************************************************************************** +// Capture SIGHALT from ControlC +void +intHandler( int dummy ) +{ + keepRunning = 0; +} + +// ********************************************************************************************** + +void +print_diags( volatile daq_multi_dcu_data_t* cycle_block ) +{ + printf( "Total models %d\nData size %d\n\n", + (int)cycle_block->header.dcuTotalModels, + (int)cycle_block->header.fullDataBlockSize ); +} + +/*********************************************************************************/ +/* M A I N */ +/* */ +/*********************************************************************************/ + +int __CDECL + main( int argc, char* argv[] ) +{ + int counter = 0; + int max_data_size_mb = 64; + int max_data_size = 0; + int error = 0; + char* eport = 0; + char* buffer_name = "local_dc"; + int send_delay_ms = 0; + + if ( argc < 3 ) + { + Usage( ); + return ( -1 ); + } + + /* Get the parameters */ + while ( ( counter = getopt( argc, argv, "b:m:v:e:l:hD:" ) ) != EOF ) + { + switch ( counter ) + { + case 'b': + buffer_name = optarg; + break; + + case 'm': + max_data_size_mb = atoi( optarg ); + if ( max_data_size_mb < 20 ) + { + return -1; + } + if ( max_data_size_mb > 100 ) + { + return -1; + } + break; + case 'v': + do_verbose = atoi( optarg ); + break; + case 'e': + eport = optarg; + break; + case 'l': + if ( 0 == freopen( optarg, "w", stdout ) ) + { + perror( "freopen" ); + exit( 1 ); + } + setvbuf( stdout, NULL, _IOLBF, 0 ); + stderr = stdout; + break; + case 'h': + Usage( ); + return ( 0 ); + case 'D': + send_delay_ms = atoi( optarg ); + break; + } + } + + max_data_size = max_data_size_mb * 1024 * 1024; + + zmq_make_connection( eport ); + + // Get pointers to local DAQ mbuf + ifo_header = findSharedMemorySize( buffer_name, max_data_size_mb ); + + signal( SIGINT, intHandler ); + + // Enter infinite loop of reading control model data and writing to local + // shared memory + reset_cycle_counter( ifo_header ); + int cycle = 0; + do + { + if ( wait_for_cycle( ifo_header, cycle ) != 0 ) + { + fprintf( stderr, "Unable to sync with data" ); + break; + } + + if ( data_will_overflow( ifo_header, cycle, max_data_size ) ) + { + fprintf( stderr, + "Overflow, required data is out of the input buffer" ); + break; + } + + if ( send_delay_ms > 0 ) + { + usleep( 1000 * send_delay_ms ); + } + + zmq_send_daq_multi_dcu_t( + (daq_multi_dcu_data_t*)get_cycle_data( ifo_header, cycle ), + daq_publisher, + 0 ); + + if ( cycle == 0 && do_verbose ) + { + print_diags( get_cycle_data( ifo_header, cycle ) ); + } + + ++cycle; + cycle %= 16; + } while ( error == 0 && keepRunning == 1 ); + + printf( "Closing out ZMQ and exiting\n" ); + zmq_close( daq_publisher ); + zmq_ctx_destroy( daq_context ); + + return 0; +}