Skip to content
Snippets Groups Projects
Commit e56f79b6 authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Work on a zmq data rate test.

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4421 6dcd42c9-f523-4c6d-aada-af552506706e
parent abe20d30
No related branches found
No related tags found
No related merge requests found
......@@ -28,7 +28,12 @@ add_executable(zmq_rcv_from_dc zmq_rcv_from_dc.c)
target_link_libraries(zmq_rcv_from_dc PUBLIC zmq4::zmq)
target_compile_options(zmq_rcv_from_dc PUBLIC -std=gnu99)
add_executable(zmq_daq_test_data_firehose tests/zmq_firehose.cc)
add_executable(zmq_daq_test_data_firehose tests/zmq_firehose_pub.cc tests/zmq_firehose_common.cc)
target_include_directories(zmq_daq_test_data_firehose PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/tests)
target_link_libraries(zmq_daq_test_data_firehose PUBLIC zmq4::zmq)
target_compile_features(zmq_daq_test_data_firehose PUBLIC cxx_defaulted_functions)
add_executable(zmq_daq_test_data_firehose_recv tests/zmq_firehose_sub.cc tests/zmq_firehose_common.cc)
target_include_directories(zmq_daq_test_data_firehose_recv PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/tests)
target_link_libraries(zmq_daq_test_data_firehose_recv PUBLIC zmq4::zmq)
target_compile_features(zmq_daq_test_data_firehose_recv PUBLIC cxx_defaulted_functions)
//
// Created by jonathan.hanks on 7/26/17.
//
#include <cmath>
#include <ctime>
#include <chrono>
#include <iostream>
#include <random>
#include <sstream>
#include <string>
#include <zmq.hpp>
#include "../zmq_daq.h"
#include "firehose_structs.hh"
struct Config {
std::string bind_point;
std::string master_filename;
int dcu_count;
int chans_per_dcu;
int max_generators;
std::mt19937_64::result_type random_seed;
Config(): bind_point{"tcp://*:55555"},
master_filename{"master_file"},
dcu_count{5}, chans_per_dcu {5},
max_generators{50},
random_seed{static_cast<std::mt19937_64::result_type >(std::time(nullptr))}
{}
Config(const Config& other) = default;
Config(Config&& other) = default;
Config& operator=(const Config& other) = default;
Config& operator=(Config&& other) = default;
};
std::string get_publisher_address() {
std::ostringstream os;
os << "tcp://*:" << DAQ_DATA_PORT;
return os.str();
}
Config parse_args(int argc, char **argv) {
return Config{};
};
int main(int argc, char **argv) {
Config cfg = parse_args(argc, argv);
std::cout << "Seeding with " << cfg.random_seed << std::endl;
std::mt19937_64 rand_generator{cfg.random_seed};
for (int i = 0; i < cfg.dcu_count; ++i) {
std::cout << "Generating dcu " << i << std::endl;
}
std::cout << "Timing a lot of sin calls" << std::endl;
auto t0 = std::chrono::steady_clock::now();
for (auto i = 0; i < 100000; ++i) {
std::sin(static_cast<double>(1.5));
}
auto t1 = std::chrono::steady_clock::now();
std::chrono::duration<double> delta = t1-t0;
std::cout << "Did lots of sin calls in " << delta.count() << " units of time" << std::endl;
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind(get_publisher_address().c_str());
return 1;
}
//
// Created by jonathan.hanks on 7/27/17.
//
#include <time.h>
#include <unistd.h>
extern "C" {
void get_time(long &gps, long &gps_n) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
gps = static_cast<long>(ts.tv_sec);
gps_n = static_cast<long>(ts.tv_nsec);
}
void wait_for(long gps, long gps_n) {
if (gps == 0) {
long dummy;
get_time(gps, dummy);
}
while (true) {
long cur_gps, cur_gps_n;
get_time(cur_gps, cur_gps_n);
if (gps < cur_gps) break;
if (gps == cur_gps && gps_n <= cur_gps_n) break;
usleep(1);
}
}
}
\ No newline at end of file
//
// Created by jonathan.hanks on 7/27/17.
//
#ifndef DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H
#define DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
extern void get_time(long &gps, long &gps_n);
extern void wait_for(long gps, long gps_n);
#ifdef __cplusplus
}
#endif
#endif //DAQD_TRUNK_ZMQ_FIREHOSE_COMMON_H
//
// Created by jonathan.hanks on 7/26/17.
//
#include <array>
#include <cmath>
#include <ctime>
//#include <chrono>
#include <iostream>
#include <random>
#include <sstream>
#include <string>
#include <vector>
#include <zmq.hpp>
#include "../zmq_daq.h"
#include "firehose_structs.hh"
#include "zmq_firehose_common.hh"
struct Config {
std::string bind_point;
std::string master_filename;
int dcu_count;
int chans_per_dcu;
int max_generators;
std::mt19937_64::result_type random_seed;
size_t data_size;
bool verbose;
Config(): bind_point{"tcp://*:5555"},
master_filename{"master_file"},
dcu_count{5}, chans_per_dcu {5},
max_generators{50},
random_seed{static_cast<std::mt19937_64::result_type >(std::time(nullptr))},
data_size{1024*1024*64},
verbose{false}
{}
Config(const Config& other) = default;
Config(Config&& other) = default;
Config& operator=(const Config& other) = default;
Config& operator=(Config&& other) = default;
};
Config parse_args(int argc, char **argv) {
Config cfg{};
for (int i = 1; i < argc; ++i) {
std::string arg{argv[i]};
std::string next_arg;
bool has_next = false;
if (i +1 < argc) {
next_arg = argv[i+1];
has_next = true;
}
if (arg == "--bind") {
if (has_next)
cfg.bind_point = next_arg;
}
if (arg == "--size") {
if (has_next) {
std::istringstream is{next_arg};
is >> cfg.data_size;
}
}
if (arg == "-v" || arg == "--verbose") {
cfg.verbose = true;
}
}
return cfg;
};
void simple_send_loop(zmq::socket_t& publisher, const Config& config) {
if (config.data_size < 1024*1024) {
std::cerr << "Data size is too small, please try at least 1MB" << std::endl;
return;
}
const int segments = 16;
const int max_gps_n = 1000 * 1000 * 1000;
const int step = max_gps_n/segments;
long gps=0;
long gps_n=0;
// initialize a full seconds worth of buffers
std::array<std::vector<char>, segments> buffers;
{
// just give each segment a different value
char val = 0;
for (auto& entry:buffers) {
entry.resize(config.data_size, val);
++val;
}
}
get_time(gps, gps_n);
gps++;
gps_n = 0;
int cur_segment = 0;
while(true) {
wait_for(gps, gps_n);
// write the time into the buffer
{
long* tmp = reinterpret_cast<long*>(buffers[cur_segment].data());
tmp[0] = gps;
tmp[1] = gps_n;
}
zmq::message_t msg(buffers[cur_segment].data(), config.data_size, nullptr, nullptr);
publisher.send(msg);
++cur_segment;
if (cur_segment >= segments) {
cur_segment = 0;
}
gps_n += step;
if (gps_n >= max_gps_n) {
++gps;
gps_n = 0;
}
{
long cur_gps, cur_gps_n;
get_time(cur_gps, cur_gps_n);
if (cur_gps > gps || (cur_gps == gps && cur_gps_n >= gps_n)) {
std::cerr << "Late wanted " << gps << ":" << gps_n << " got " << cur_gps << ":" << cur_gps_n << std::endl;
} else if (config.verbose) {
std::cout << "Sent " << gps << ":" << gps_n << " by " << cur_gps << ":" << cur_gps_n << "\n";
}
}
}
}
int main(int argc, char **argv) {
Config cfg = parse_args(argc, argv);
std::cout << "Seeding with " << cfg.random_seed << std::endl;
std::mt19937_64 rand_generator{cfg.random_seed};
// for (int i = 0; i < cfg.dcu_count; ++i) {
// std::cout << "Generating dcu " << i << std::endl;
//
// }
// std::cout << "Timing a lot of sin calls" << std::endl;
// auto t0 = std::chrono::steady_clock::now();
// for (auto i = 0; i < 100000; ++i) {
// std::sin(static_cast<double>(1.5));
// }
// auto t1 = std::chrono::steady_clock::now();
// std::chrono::duration<double> delta = t1-t0;
//
// std::cout << "Did lots of sin calls in " << delta.count() << " units of time" << std::endl;
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind(cfg.bind_point.c_str());
simple_send_loop(publisher, cfg);
return 1;
}
//
// Created by jonathan.hanks on 7/26/17.
//
#include <array>
#include <cmath>
#include <iostream>
#include <random>
#include <sstream>
#include <string>
#include <vector>
#include <zmq.hpp>
#include "../zmq_daq.h"
#include "firehose_structs.hh"
#include "zmq_firehose_common.hh"
struct Config {
std::string bind_point;
bool verbose;
Config(): bind_point{"tcp://127.0.0.1:5555"},
verbose{false}
{}
Config(const Config& other) = default;
Config(Config&& other) = default;
Config& operator=(const Config& other) = default;
Config& operator=(Config&& other) = default;
};
Config parse_args(int argc, char **argv) {
Config cfg{};
for (int i = 1; i < argc; ++i) {
std::string arg{argv[i]};
std::string next_arg;
bool has_next = false;
if (i +1 < argc) {
next_arg = argv[i+1];
has_next = true;
}
if (arg == "--bind") {
if (has_next)
cfg.bind_point = next_arg;
}
if (arg == "-v" || arg == "--verbose") {
cfg.verbose = true;
}
}
return cfg;
};
void simple_recv_loop(zmq::socket_t& subscriber, const Config& config) {
const int segments = 16;
const int max_gps_n = 1000 * 1000 * 1000;
const int step = max_gps_n/segments;
long expected_gps=0;
long expected_gps_n=0;
while(true) {
zmq::message_t msg(0);
subscriber.recv(&msg);
long gps, gps_n;
bool error = false;
{
long *tmp = reinterpret_cast<long *>(msg.data());
gps = tmp[0];
gps_n = tmp[1];
if (expected_gps != 0) {
if (expected_gps != gps || expected_gps_n != gps_n) {
std::cerr << "ERROR: expecting " << expected_gps << ":" << expected_gps_n << " got " << gps << ":" << gps_n << " instead." << std::endl;
}
}
expected_gps = gps;
expected_gps_n = gps_n + step;
if (expected_gps_n >= max_gps_n) {
++expected_gps;
expected_gps_n = 0;
}
}
if (config.verbose) {
std::cout << "Recieved " << msg.size() << " bytes for " << gps << ":" << gps_n << std::endl;
}
}
}
int main(int argc, char **argv) {
Config cfg = parse_args(argc, argv);
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber.connect(cfg.bind_point.c_str());
simple_recv_loop(subscriber, cfg);
return 1;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment