...
 
Commits (2)
......@@ -17,13 +17,8 @@ target_include_directories(zmq_transport PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../i
target_include_directories(zmq_transport PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(zmq_transport PRIVATE zmq4::zmq)
#add_library(zmq_dc_recv STATIC zmq_dc_recv.c zmq_dc_recv.h)
#target_include_directories(zmq_dc_recv PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_dc_recv PUBLIC zmq4::zmq ${CMAKE_THREAD_LIBS_INIT})
#add_library(zmq::dc_recv ALIAS zmq_dc_recv)
add_executable(zmq_xmit zmq_fe.c dc_utils.c)
target_link_libraries(zmq_xmit PUBLIC driver::shmem zmq_transport zmq4::zmq zmq::simple_pv ${CMAKE_THREAD_LIBS_INIT})
add_executable(zmq_xmit zmq_xmit.c dc_utils.c)
target_link_libraries(zmq_xmit PUBLIC driver::shmem zmq_transport zmq4::zmq ${CMAKE_THREAD_LIBS_INIT})
add_executable(zmq_recv zmq_recv.cc dc_utils.c)
# target_compile_options(zmq_rcv_ix_xmit_cxx PUBLIC -fsanitize=address)
......@@ -38,69 +33,12 @@ target_link_libraries(zmq_recv PUBLIC
${CMAKE_THREAD_LIBS_INIT}
# -no-pie
)
install(TARGETS zmq_recv DESTINATION bin)
#add_executable(zmq_multi_stream zmq_multi_stream.c ../drv/rfm.c ${CMAKE_CURRENT_SOURCE_DIR}/../include/daq_core.h)
#target_include_directories(zmq_multi_stream PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_multi_stream PUBLIC zmq4::zmq)
#target_compile_options(zmq_multi_stream PUBLIC -std=gnu99)
#add_executable(zmq_threads zmq_threads.c dc_utils.c)
#target_include_directories(zmq_threads PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_threads PUBLIC
# driver::shmem
# zmq_transport
# zmq4::zmq
# zmq::simple_pv
# ${CMAKE_THREAD_LIBS_INIT})
#target_compile_options(zmq_threads PUBLIC -std=gnu99)
#add_executable(zmq_dc zmq_dc.cc)
#target_include_directories(zmq_dc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_dc PUBLIC zmq_dc_recv zmq4::zmq ${CMAKE_THREAD_LIBS_INIT})
#target_compile_options(zmq_dc PUBLIC ${CPP11_FLAG})
#add_executable(zmq_recv zmq_recv.cc ../drv/rfm.c ${CMAKE_CURRENT_SOURCE_DIR}/../include/daq_core.h)
#target_include_directories(zmq_recv PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_recv PUBLIC zmq_dc_recv zmq4::zmq driver::shmem ${CMAKE_THREAD_LIBS_INIT})
#if (${CXX_HAS_ATOMIC})
#else (${CXX_HAS_ATOMIC})
# target_compile_definitions(zmq_recv PUBLIC "NO_STD_ATOMIC=1")
#endif (${CXX_HAS_ATOMIC})
#target_compile_options(zmq_recv PUBLIC ${CPP11_FLAG})
configure_file(test_zmq_xmit_recv.sh.in test_zmq_xmit_recv.sh @ONLY)
#add_executable(zmq_proxy zmq_proxy.c)
#target_include_directories(zmq_proxy PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_proxy PUBLIC zmq4::zmq)
#target_compile_options(zmq_proxy PUBLIC -std=gnu99)
#add_executable(zmq_proxy_client zmq_proxy_client.c)
#target_include_directories(zmq_proxy_client PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_proxy_client PUBLIC zmq4::zmq)
#target_compile_options(zmq_proxy_client PUBLIC -std=gnu99)
#add_executable(zmq_rcv_from_dc zmq_rcv_from_dc.c)
#target_include_directories(zmq_rcv_from_dc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
#target_link_libraries(zmq_rcv_from_dc PUBLIC zmq4::zmq)
#target_compile_options(zmq_rcv_from_dc PUBLIC -std=gnu99)
if (${CXX_HAS_ATOMIC})
# don't even try building these on pre c++11 compilers.
add_executable(zmq_daq_test_data_firehose tests/zmq_firehose_pub.cc tests/zmq_firehose_common.cc)
target_include_directories(zmq_daq_test_data_firehose PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
target_include_directories(zmq_daq_test_data_firehose PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/tests)
target_link_libraries(zmq_daq_test_data_firehose PUBLIC zmq4::zmq)
target_requires_cpp11(zmq_daq_test_data_firehose PUBLIC)
#target_compile_features(zmq_daq_test_data_firehose PUBLIC cxx_defaulted_functions)
add_executable(zmq_daq_test_data_firehose_recv tests/zmq_firehose_sub.cc tests/zmq_firehose_common.cc)
target_include_directories(zmq_daq_test_data_firehose_recv PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
target_include_directories(zmq_daq_test_data_firehose_recv PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/tests)
target_link_libraries(zmq_daq_test_data_firehose_recv PUBLIC zmq4::zmq)
target_requires_cpp11(zmq_daq_test_data_firehose_recv PUBLIC)
#target_compile_features(zmq_daq_test_data_firehose_recv PUBLIC cxx_defaulted_functions)
endif (${CXX_HAS_ATOMIC})
add_test(NAME "test_zmq_xmit_recv"
COMMAND /bin/bash ./test_zmq_xmit_recv.sh
WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
install(TARGETS zmq_recv DESTINATION bin)
install(TARGETS zmq_xmit DESTINATION bin)
......@@ -15,7 +15,7 @@ TARGETS=zmq_xmit
#zmq_multi_stream: zmq_multi_stream.o rfm.o
#zmqms: zmqms.o rfm.o
zmq_xmit: zmq_fe.o rfm.o zmq_transport.o dc_utils.o simple_pv.o
zmq_xmit: zmq_xmit.o rfm.o zmq_transport.o dc_utils.o simple_pv.o
#gdstp_test: gdstp_test.o rfm.o
#zmq_multi_rcvr: zmq_multi_rcvr.o
#zmq_threads: zmq_threads.o rfm.o zmq_transport.o dc_utils.o
......@@ -38,7 +38,7 @@ LDFLAGS = -L/usr/local/lib -lzmq -lpthread
.PHONY: clean
clean:
rm -r *.o zmq_multi_stream zmq_multi_rcvr zmqms zmq_fe zmq_xmit
rm -r *.o zmq_multi_stream zmq_multi_rcvr zmqms zmq_xmit
.PHONY: distclean
distclean: clean
......
#!/bin/bash
CWD="@CMAKE_CURRENT_BINARY_DIR@"
TDIR=""
PID_STREAM0=0
PID_STREAM1=0
PID_STREAM2=0
PID_LOCAL_DC=0
PID_ZMQ_XMIT=0
PID_ZMQ_RECV=0
function kill_proc {
if [ $1 -gt 0 ]; then
echo "Closing process $1"
kill $1
fi
}
function cleanup {
if [ "x$TDIR" != "x" ]; then
if [ -d $TDIR ]; then
rm -rf "$TDIR"
fi
fi
kill_proc $PID_STREAM0
kill_proc $PID_STREAM1
kill_proc $PID_STREAM2
kill_proc $PID_LOCAL_DC
kill_proc $PID_ZMQ_XMIT
kill_proc $PID_ZMQ_RECV
}
LOCAL_DC="$CWD/../local_dc/local_dc"
if [ ! -x "$LOCAL_DC" ]; then
echo "$LOCAL_DC"
exit 1
fi
ZMQ_XMIT="$CWD/zmq_xmit"
if [ ! -x "$ZMQ_XMIT" ]; then
echo "zmq_emit is required"
exit 1
fi
ZMQ_RECV="$CWD/zmq_recv"
if [ ! -x "$ZMQ_RECV" ]; then
echo "zmq_recv is required"
exit 1
fi
FE_STREAM_CHECK="$CWD/../fe_stream_test/fe_stream_check"
if [ ! -x "$FE_STREAM_CHECK" ]; then
echo "fe_stream_check is required"
exit 1
fi
FE_STREAM_TEST="$CWD/../fe_stream_test/fe_stream_test"
if [ ! -x "$FE_STREAM_TEST" ]; then
echo "fe_stream_test is required"
exit 1
fi
if [ ! -w /dev/gpstime ]; then
echo "the gpstime module must be loaded, configured, and accessible by this user"
exit 1
fi
if [ ! -w /dev/mbuf ]; then
echo "the mbuf module must be loaded, configured, and accessible by this user"
exit 1
fi
PYTHON=""
which python > /dev/null
if [ $? -eq 0 ]; then
PYTHON=`which python`
else
which python3 > /dev/null
if [ $? -eq 0 ]; then
PYTHON=`which python3`
else
echo "Cannot find python or python3"
exit 1
fi
fi
trap cleanup EXIT
TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"`
mkdir "$TDIR/ini_files"
mkdir "$TDIR/logs"
echo "Ini dir = $TDIR/ini_files"
"$FE_STREAM_TEST" -d 50 -r 1 -s x1iop1 -i "$TDIR/ini_files" > "$TDIR/logs/iop1" &
PID_STREAM0=$!
"$FE_STREAM_TEST" -d 61 -r 3 -s x1mod1 -i "$TDIR/ini_files" > "$TDIR/logs/mod1" &
PID_STREAM1=$!
"$FE_STREAM_TEST" -d 70 -r 3 -s x1mod2 -i "$TDIR/ini_files" > "$TDIR/logs/mod2" &
PID_STREAM2=$!
echo "Streamer PIDS = $PID_STREAM0 $PID_STREAM1 $PID_STREAM2"
"$LOCAL_DC" -m 100 -s "x1iop1 x1mod1 x1mod2" -d "$TDIR/ini_files" -b "local_dc" > "$TDIR/logs/local_dc" &
PID_LOCAL_DC=$!
sleep 1
"$ZMQ_XMIT" -b "local_dc" -m 100 -e lo > "$TDIR/logs/zmq_xmit.log" &
PID_ZMQ_XMIT=$!
"$ZMQ_RECV" -b "zmq_recv" -m 100 -s localhost > "$TDIR/logs/zmq_recv.log" &
PID_ZMQ_RECV=$!
"$FE_STREAM_CHECK" -m zmq_recv -s 100 -v -c "$TDIR/ini_files/x1iop1.ini" "$TDIR/ini_files/tpchn_x1iop1.par" \
-c "$TDIR/ini_files/x1mod1.ini" "$TDIR/ini_files/tpchn_x1mod1.par" \
-c "$TDIR/ini_files/x1mod2.ini" "$TDIR/ini_files/tpchn_x1mod2.par"
RESULT=$?
exit $?
#echo "Press enter to continue..."
#DUMMY=""
#read DUMMY
\ No newline at end of file
//
// Created by jonathan.hanks on 7/26/17.
//
#ifndef DAQD_TRUNK_FIREHOSE_DCU_HH
#define DAQD_TRUNK_FIREHOSE_DCU_HH
#include <string>
class FirehoseGenerator {
};
class FirehoseDCU {
public:
private:
int id_;
};
#endif //DAQD_TRUNK_FIREHOSE_DCU_HH
//
// Created by jonathan.hanks on 7/27/17.
//
#include <time.h>
#include <unistd.h>
extern "C" {
void get_time(long &gps, long &gps_n) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
gps = static_cast<long>(ts.tv_sec);
gps_n = static_cast<long>(ts.tv_nsec);
}
void wait_for(long gps, long gps_n) {
if (gps == 0) {
long dummy;
get_time(gps, dummy);
}
while (true) {
long cur_gps, cur_gps_n;
get_time(cur_gps, cur_gps_n);
if (gps < cur_gps) break;
if (gps == cur_gps && gps_n <= cur_gps_n) break;
usleep(1);
}
}
}
\ No newline at end of file
//
// Created by jonathan.hanks on 7/27/17.
//
#ifndef DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H
#define DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
extern void get_time(long &gps, long &gps_n);
extern void wait_for(long gps, long gps_n);
#ifdef __cplusplus
}
#endif
#endif //DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H
//
// Created by jonathan.hanks on 7/26/17.
//
#include <algorithm>
#include <array>
#include <cmath>
#include <ctime>
//#include <chrono>
#include <iostream>
#include <random>
#include <sstream>
#include <string>
#include <vector>
#include <zmq.hpp>
#include "../zmq_daq.h"
#include "firehose_structs.hh"
#include "zmq_firehose_common.hh"
struct Config {
std::string bind_point;
std::string master_filename;
int dcu_count;
int chans_per_dcu;
int max_generators;
std::mt19937_64::result_type random_seed;
size_t data_size;
int rate_limit;
int recovery_ms;
int threads;
int chunk_size;
bool verbose;
bool extra_verbose;
bool zero_copy;
Config(): bind_point{"tcp://*:5555"},
master_filename{"master_file"},
dcu_count{5}, chans_per_dcu {5},
max_generators{50},
random_seed{static_cast<std::mt19937_64::result_type >(std::time(nullptr))},
data_size{1024*1024*64},
rate_limit{0},
recovery_ms{10000},
threads{1},
chunk_size{100*1024*1024},
verbose{false},
extra_verbose{false},
zero_copy{false}
{}
Config(const Config& other) = default;
Config(Config&& other) = default;
Config& operator=(const Config& other) = default;
Config& operator=(Config&& other) = default;
};
Config parse_args(int argc, char **argv) {
Config cfg{};
for (int i = 1; i < argc; ++i) {
std::string arg{argv[i]};
std::string next_arg;
bool has_next = false;
if (i +1 < argc) {
next_arg = argv[i+1];
has_next = true;
}
if (arg == "--bind") {
if (has_next) {
cfg.bind_point = next_arg;
++i;
}
} else if (arg == "--size") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.data_size;
++i;
}
} else if (arg == "--rate-limit") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.rate_limit;
++i;
}
std::cout << "rate limit = " << cfg.rate_limit << std::endl;
} else if (arg == "--recovery-ms") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.recovery_ms;
++i;
}
} else if (arg == "--threads") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.threads;
++i;
}
} else if (arg == "-v" || arg == "--verbose") {
cfg.verbose = true;
} else if (arg == "-vv") {
cfg.verbose = true;
cfg.extra_verbose = true;
} else if (arg == "-z" || arg == "--zero-copy") {
cfg.zero_copy = true;
} else if (arg == "--chunk") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.chunk_size;
++i;
}
} else {
std::cerr << "Unknown argument " << arg << std::endl;
}
}
return cfg;
};
void simple_send_loop(zmq::socket_t& publisher, const Config& config) {
if (config.data_size < 1024) {
std::cerr << "Data size is too small, please try at least 1KB" << std::endl;
return;
}
const int segments = 16;
const int max_gps_n = 1000 * 1000 * 1000;
const int step = max_gps_n/segments;
long gps=0;
long gps_n=0;
if (config.verbose) {
std::cout << "Sending " << config.data_size * 16 << " bytes in 1/16s chunks" << std::endl;
}
// initialize a full seconds worth of buffers
std::array<std::vector<char>, segments> buffers;
{
// just give each segment a different value
char val = 0;
for (auto& entry:buffers) {
entry.resize(config.data_size, val);
++val;
}
}
get_time(gps, gps_n);
gps++;
gps_n = 0;
int cur_segment = 0;
while(true) {
wait_for(gps, gps_n);
int parts = 0;
size_t total_size = 0;
if (config.zero_copy) {
zmq::message_t msg(buffers[cur_segment].data(), config.data_size, nullptr, nullptr);
long* tmp = reinterpret_cast<long*>(buffers[cur_segment].data());
tmp[0] = gps;
tmp[1] = gps_n;
publisher.send(msg);
parts = 1;
total_size = config.data_size;
} else {
size_t cur = 0;
const size_t chunk_size = config.chunk_size;
while (cur < config.data_size) {
size_t end = cur + chunk_size;
bool last_message = false;
if (end >= config.data_size) {
last_message = true;
end = config.data_size;
}
size_t message_size = end - cur;
total_size += message_size;
zmq::message_t msg(message_size);
char *data = reinterpret_cast<char *>(msg.data());
std::fill(data, data + message_size, static_cast<char>(cur_segment));
if (cur == 0) {
long *tmp = reinterpret_cast<long *>(data);
tmp[0] = gps;
tmp[1] = gps_n;
}
publisher.send(msg, (last_message ? 0 : ZMQ_SNDMORE));
if (config.extra_verbose)
std::cerr << "\t" << message_size << "-" << (last_message ? 0 : ZMQ_SNDMORE) << "\n";
parts++;
cur = end;
}
}
++cur_segment;
if (cur_segment >= segments) {
cur_segment = 0;
}
gps_n += step;
if (gps_n >= max_gps_n) {
++gps;
gps_n = 0;
}
{
long cur_gps, cur_gps_n;
get_time(cur_gps, cur_gps_n);
if (cur_gps > gps || (cur_gps == gps && cur_gps_n >= gps_n)) {
std::cerr << "Late wanted " << gps << ":" << gps_n << " got " << cur_gps << ":" << cur_gps_n << std::endl;
} else if (config.verbose) {
std::cout << "Sent " << total_size << " bytes at " << gps << ":" << gps_n << " by " << cur_gps << ":" << cur_gps_n << " in " << parts << " parts\n";
}
}
}
}
int main(int argc, char **argv) {
Config cfg = parse_args(argc, argv);
std::cout << "Seeding with " << cfg.random_seed << std::endl;
std::mt19937_64 rand_generator{cfg.random_seed};
// for (int i = 0; i < cfg.dcu_count; ++i) {
// std::cout << "Generating dcu " << i << std::endl;
//
// }
// std::cout << "Timing a lot of sin calls" << std::endl;
// auto t0 = std::chrono::steady_clock::now();
// for (auto i = 0; i < 100000; ++i) {
// std::sin(static_cast<double>(1.5));
// }
// auto t1 = std::chrono::steady_clock::now();
// std::chrono::duration<double> delta = t1-t0;
//
// std::cout << "Did lots of sin calls in " << delta.count() << " units of time" << std::endl;
zmq::context_t context(cfg.threads);
zmq::socket_t publisher(context, ZMQ_PUB);
{
if (cfg.rate_limit > 0) {
std::cout << "Setting rate limit to " << cfg.rate_limit << "Kb/s" << std::endl;
publisher.setsockopt(ZMQ_RATE, &cfg.rate_limit, sizeof(cfg.rate_limit));
}
std::cout << "Setting recovery_ivl to " << cfg.recovery_ms << std::endl;
publisher.setsockopt(ZMQ_RECOVERY_IVL, &cfg.recovery_ms, sizeof(cfg.recovery_ms));
int rate = 0;
size_t rate_size = sizeof(rate);
publisher.getsockopt(ZMQ_RATE, &rate, &rate_size);
std::cout << "Rate limit confirmed at " << rate << "Kb/s" << std::endl;
}
if (cfg.verbose)
std::cout << "Binding to " << cfg.bind_point << std::endl;
publisher.bind(cfg.bind_point.c_str());
if (cfg.verbose)
std::cout << "Starting send loop" << std::endl;
simple_send_loop(publisher, cfg);
return 1;
}
//
// Created by jonathan.hanks on 7/26/17.
//
#include <array>
#include <cmath>
#include <iostream>
#include <random>
#include <sstream>
#include <string>
#include <vector>
#include <zmq.hpp>
#include "../zmq_daq.h"
#include "firehose_structs.hh"
#include "zmq_firehose_common.hh"
struct Config {
std::string bind_point;
int rate_limit;
int threads;
bool verbose;
bool extra_verbose;
Config(): bind_point{"tcp://127.0.0.1:5555"},
rate_limit{0},
threads{1},
verbose{false},
extra_verbose{false}
{}
Config(const Config& other) = default;
Config(Config&& other) = default;
Config& operator=(const Config& other) = default;
Config& operator=(Config&& other) = default;
};
Config parse_args(int argc, char **argv) {
Config cfg{};
for (int i = 1; i < argc; ++i) {
std::string arg{argv[i]};
std::string next_arg;
bool has_next = false;
if (i +1 < argc) {
next_arg = argv[i+1];
has_next = true;
}
if (arg == "--connect") {
if (has_next)
cfg.bind_point = next_arg;
}
if (arg == "--threads") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.threads;
++i;
}
}
if (arg == "-v" || arg == "--verbose") {
cfg.verbose = true;
}
if (arg == "-vv") {
cfg.verbose = true;
cfg.extra_verbose = true;
}
if (arg == "--rate-limit") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.rate_limit;
++i;
}
}
}
return cfg;
};
void simple_recv_loop(zmq::socket_t& subscriber, const Config& config) {
const int segments = 16;
const int max_gps_n = 1000 * 1000 * 1000;
const int step = max_gps_n/segments;
long expected_gps=0;
long expected_gps_n=0;
while(true) {
zmq::message_t msg(0);
size_t total_size = 0;
long gps = 0, gps_n = 0;
bool error = false;
bool first = true;
int parts = 0;
int more_parts = 0;
size_t opt_len = sizeof(more_parts);
do {
subscriber.recv(&msg);
++parts;
if (first) {
long *tmp = reinterpret_cast<long *>(msg.data());
gps = tmp[0];
gps_n = tmp[1];
first = false;
}
total_size += msg.size();
opt_len = sizeof(more_parts);
subscriber.getsockopt(ZMQ_RCVMORE, &more_parts, &opt_len);
if (config.extra_verbose)
std::cerr << "\t" << msg.size() << "-" << more_parts << "\n";
} while (more_parts != 0);
if (expected_gps != 0) {
if (expected_gps != gps || expected_gps_n != gps_n) {
std::cerr << "ERROR: expecting " << expected_gps << ":" << expected_gps_n << " got " << gps << ":" << gps_n << " instead." << std::endl;
}
}
expected_gps = gps;
expected_gps_n = gps_n + step;
if (expected_gps_n >= max_gps_n) {
++expected_gps;
expected_gps_n = 0;
}
if (config.verbose) {
std::cout << "Recieved " << total_size << " bytes for " << gps << ":" << gps_n << " in " << parts << " parts\n";
}
}
}
int main(int argc, char **argv) {
Config cfg = parse_args(argc, argv);
zmq::context_t context(cfg.threads);
zmq::socket_t subscriber(context, ZMQ_SUB);
{
if (cfg.rate_limit > 0) {
std::cout << "Setting rate limit to " << cfg.rate_limit << "Kb/s" << std::endl;
subscriber.setsockopt(ZMQ_RATE, &cfg.rate_limit, sizeof(cfg.rate_limit));
}
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
int rate = 0;
size_t rate_size = sizeof(rate);
subscriber.getsockopt(ZMQ_RATE, &rate, &rate_size);
std::cout << "Rate limit confirmed at " << rate << "Kb/s" << std::endl;
}
std::cout << "connecting to " << cfg.bind_point << std::endl;
subscriber.connect(cfg.bind_point.c_str());
std::cout << "Beginning receive loop " << std::endl;
simple_recv_loop(subscriber, cfg);
return 1;
}
This diff is collapsed.
//
/// @file zmq_xmit.c
/// @brief Transmit data from a buffer containing a
/// daq_multi_cycle_header_t structure out over
/// a zmq publisher.
///
//
#define _GNU_SOURCE
#define _XOPEN_SOURCE 700
#include <ctype.h>
#include <fcntl.h>
#include <malloc.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "../drv/crc.c"
#include "../include/daqmap.h"
#include "../include/drv/fb.h"
#include "../include/daq_core.h"
#include "../drv/gpstime/gpstime.h"
#include "dc_utils.h"
#include "zmq_transport.h"
#include <assert.h>
#include <zmq.h>
#define MAX_NSYS 24
#define __CDECL
static const int buf_size = DAQ_DCU_BLOCK_SIZE * 2;
daq_multi_cycle_header_t* ifo_header;
extern void* findSharedMemory( char* );
extern void* findSharedMemorySize( char*, int );
int do_verbose = 0;
// int sendLength = 0;
static volatile int keepRunning = 1;
char* ifo;
char* ifo_data;
size_t cycle_data_size;
// ZMQ defines
// char msg_buffer[0x200000];
void* daq_context;
void* daq_publisher;
/*********************************************************************************/
/* U S A G E */
/* */
/*********************************************************************************/
void
Usage( )
{
printf( "Usage of zmq_xmit:\n" );
printf( "zmq_xmit -s <models> <OPTIONS>\n" );
printf(
" -b <buffer> : Name of the mbuf to read data from [local_dc]\n" );
printf( " -m <value> : Local memory buffer size in megabytes\n" );
printf( " -v 1 : Enable verbose output\n" );
printf(
" -e <interface> : Name of the interface to broadcast data through\n" );
printf( " -D <value> : Add a delay in ms to sending the data. Used to "
"spread the load\n" );
printf( " : when working with multiple sending systems. "
"Defaults to 0." );
printf( " -h : This helpscreen\n" );
printf( "\n" );
}
void
zmq_make_connection( char* eport )
{
char loc[ 200 ];
int rc;
// Set up the data publisher socket
daq_context = zmq_ctx_new( );
daq_publisher = zmq_socket( daq_context, ZMQ_PUB );
// sprintf(loc,"%s%d","tcp://*:",DAQ_DATA_PORT);
// sprintf(loc,"%s%s%s%d","tcp://",eport,":",DAQ_DATA_PORT);
if ( !dc_generate_connection_string( loc, eport, sizeof( loc ), 0 ) )
{
fprintf(
stderr, "Unable to generate connection string for '%s'\n", eport );
exit( 1 );
}
dc_set_zmq_options( daq_publisher );
printf( "Binding to '%s'\n", loc );
rc = zmq_bind( daq_publisher, loc );
if ( rc < 0 )
{
fprintf( stderr, "Errno = %d: %s\n", errno, strerror( errno ) );
}
assert( rc == 0 );
printf( "sending data on %s\n", loc );
}
// **********************************************************************************************
/**
* @brief Set the cycle counter to an invalid value.
* @param header pointer to the input block header
* @note Used to force a resync of the counter.
*/
void
reset_cycle_counter( volatile daq_multi_cycle_header_t* header )
{
header->curCycle = 0x50505050;
}
/**
* @brief wait until the data in the input shared mem buffer has the
* @param header pointer to the input block header
* requested cycle counter.
*
* @returns non-zero if it we timeout
*/
int
wait_for_cycle( volatile daq_multi_cycle_header_t* header,
unsigned int requested_cycle )
{
int timeout = 0;
do
{
usleep( 2000 );
if ( header->curCycle == requested_cycle )
{
return 0;
}
++timeout;
} while ( timeout < 500 );
return 1;
}
/**
* @brief Check to see if the requested input data is in the input buffer
* @param header pointer to the input block header
* @param the cycle you are checking data for
* @param max_data_size the max size of the input buffer
* @return 0 if safe, != 0 if overflow
*/
int
data_will_overflow( volatile daq_multi_cycle_header_t* header,
int cycle,
int max_data_size )
{
return ( ( ( cycle + 1 ) * header->cycleDataSize ) > max_data_size );
}
/**
* @brief Return a pointer to the daq_multi_dcu_data_t* for a given cycle
* @param header The main input buffer
* @param cycle The cycle to retrieve data for
* @returns A pointer to the cycles data
* @note does not do bounds checking
*/
volatile daq_multi_dcu_data_t*
get_cycle_data( volatile daq_multi_cycle_header_t* header, int cycle )
{
int offset = cycle * header->cycleDataSize;
volatile char* data_block =
&( ( (volatile daq_multi_cycle_data_t*)header )->dataBlock[ 0 ] );
return (volatile daq_multi_dcu_data_t*)( data_block + offset );
}
// **********************************************************************************************
// Capture SIGHALT from ControlC
void
intHandler( int dummy )
{
keepRunning = 0;
}
// **********************************************************************************************
void
print_diags( volatile daq_multi_dcu_data_t* cycle_block )
{
printf( "Total models %d\nData size %d\n\n",
(int)cycle_block->header.dcuTotalModels,
(int)cycle_block->header.fullDataBlockSize );
}
/*********************************************************************************/
/* M A I N */
/* */
/*********************************************************************************/
int __CDECL
main( int argc, char* argv[] )
{
int counter = 0;
int max_data_size_mb = 64;
int max_data_size = 0;
int error = 0;
char* eport = 0;
char* buffer_name = "local_dc";
int send_delay_ms = 0;
if ( argc < 3 )
{
Usage( );
return ( -1 );
}
/* Get the parameters */
while ( ( counter = getopt( argc, argv, "b:m:v:e:l:hD:" ) ) != EOF )
{
switch ( counter )
{
case 'b':
buffer_name = optarg;
break;
case 'm':
max_data_size_mb = atoi( optarg );
if ( max_data_size_mb < 20 )
{
return -1;
}
if ( max_data_size_mb > 100 )
{
return -1;
}
break;
case 'v':
do_verbose = atoi( optarg );
break;
case 'e':
eport = optarg;
break;
case 'l':
if ( 0 == freopen( optarg, "w", stdout ) )
{
perror( "freopen" );
exit( 1 );
}
setvbuf( stdout, NULL, _IOLBF, 0 );
stderr = stdout;
break;
case 'h':
Usage( );
return ( 0 );
case 'D':
send_delay_ms = atoi( optarg );
break;
}
}
max_data_size = max_data_size_mb * 1024 * 1024;
zmq_make_connection( eport );
// Get pointers to local DAQ mbuf
ifo_header = findSharedMemorySize( buffer_name, max_data_size_mb );
signal( SIGINT, intHandler );
// Enter infinite loop of reading control model data and writing to local
// shared memory
reset_cycle_counter( ifo_header );
int cycle = 0;
do
{
if ( wait_for_cycle( ifo_header, cycle ) != 0 )
{
fprintf( stderr, "Unable to sync with data" );
break;
}
if ( data_will_overflow( ifo_header, cycle, max_data_size ) )
{
fprintf( stderr,
"Overflow, required data is out of the input buffer" );
break;
}
if ( send_delay_ms > 0 )
{
usleep( 1000 * send_delay_ms );
}
zmq_send_daq_multi_dcu_t(
(daq_multi_dcu_data_t*)get_cycle_data( ifo_header, cycle ),
daq_publisher,
0 );
if ( cycle == 0 && do_verbose )
{
print_diags( get_cycle_data( ifo_header, cycle ) );
}
++cycle;
cycle %= 16;
} while ( error == 0 && keepRunning == 1 );
printf( "Closing out ZMQ and exiting\n" );
zmq_close( daq_publisher );
zmq_ctx_destroy( daq_context );
return 0;
}