Skip to content
Snippets Groups Projects
Commit 227bbf6a authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Adding optional source interface specifications to the zmq_rcv_ix_xmit

This is an attempt to allow load balancing of incoming connections
across multiple interfaces.

Added a -L option which would allow specifying multiple
interfaces.  Each subscription gets a specific source interface
via a round robin selection.

If not specified, zmq figures out interface to use.

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4774 6dcd42c9-f523-4c6d-aada-af552506706e
parent 93414b48
No related branches found
No related tags found
No related merge requests found
......@@ -47,16 +47,19 @@
#include "./dolphin_common.c"
extern void *findSharedMemorySize(char *,int);
int do_verbose = 0;
int thread_index[DCU_COUNT];
// int thread_index[DCU_COUNT];
struct thread_info {
int index;
char *src_iface;
};
struct thread_info thread_index[DCU_COUNT];
void *daq_context[DCU_COUNT];
void *daq_subscriber[DCU_COUNT];
char *sname[DCU_COUNT]; // Names of FE computers serving DAQ data
char *local_iface[32];
daq_multi_dcu_data_t mxDataBlockSingle[32];
const int mc_header_size = sizeof(daq_multi_cycle_header_t);
int stop_working_threads = 0;
......@@ -70,13 +73,15 @@ void
usage()
{
fprintf(stderr, "Usage: zmq_rcv_ix_xmit [args] -s server names -m shared memory size -g IX channel \n");
fprintf(stderr, "-l filename - log file name\n");
fprintf(stderr, "-s - server names eg x1lsc0, x1susex, etc.\n");
fprintf(stderr, "-l filename - log file name\n");
fprintf(stderr, "-b buffername - name of the mbuf to write to\n");
fprintf(stderr, "-s - server names separated by spaces eg \"x1lsc0 x1susex etc.\"\n");
fprintf(stderr, "-v - verbose prints diag test data\n");
fprintf(stderr, "-g - Dolphin IX channel to xmit on\n");
fprintf(stderr, "-p - Debug pv prefix, requires -P as well\n");
fprintf(stderr, "-P - Path to a named pipe to send PV debug information to\n");
fprintf(stderr, "-d - Max delay in milli seconds to wait for a FE to send data, defaults to 10\n");
fprintf(stderr, "-L ifaces - local interfaces to listen on [used for load balancing], eg \"eth0 eth1\"\n");
fprintf(stderr, "-h - help\n");
}
......@@ -153,9 +158,9 @@ void *rcvr_thread_mon(void *ctx)
// Thread for receiving DAQ data via ZMQ
// *************************************************************************
void *rcvr_thread(void *arg) {
int *mythread = (int *)arg;
int mt = *mythread;
printf("myarg = %d\n",mt);
struct thread_info* my_info = (struct thread_info*)arg;
int mt = my_info->index;
printf("myarg = %d on iface %s\n", mt, (my_info->src_iface ? my_info->src_iface : "default interface"));
zmq_msg_t message;
int cycle = 0;
daq_multi_dcu_data_t *mxDataBlock;
......@@ -176,7 +181,7 @@ void *rcvr_thread(void *arg) {
rc = zmq_setsockopt(zsocket, ZMQ_SUBSCRIBE, "", 0);
assert(rc == 0);
if (!dc_generate_connection_string(loc, sname[mt], sizeof(loc))) {
if (!dc_generate_connection_string(loc, sname[mt], sizeof(loc), my_info->src_iface)) {
fprintf(stderr, "Unable to create connection string for '%s'\n", sname[mt]);
exit(1);
}
......@@ -222,6 +227,8 @@ main(int argc, char **argv)
int ii; // Loop counter
int delay_ms = 10;
int delay_cycles = 0;
unsigned int niface = 0; // The number of local interfaces to split receives across.
char *local_iface_names = 0;
extern char *optarg; // Needed to get arguments to program
......@@ -253,7 +260,7 @@ main(int argc, char **argv)
// Get arguments sent to process
while ((c = getopt(argc, argv, "b:hs:m:g:vp:P:d:")) != EOF) switch(c) {
while ((c = getopt(argc, argv, "b:hs:m:g:vp:P:d:L:")) != EOF) switch(c) {
case 's':
sysname = optarg;
break;
......@@ -292,6 +299,9 @@ main(int argc, char **argv)
}
break;
case 'h':
case 'L':
local_iface_names = optarg;
break;
default:
usage();
exit(1);
......@@ -320,6 +330,20 @@ main(int argc, char **argv)
nsys++;
}
// Parse local interface names
if (local_iface_names) {
local_iface[0] = strtok(local_iface_names, " ");
niface++;
for (;;) {
printf("local interface %s\n", local_iface[niface-1]);
char *s = strtok(0, " ");
if (!s) break;
assert(niface < 32);
local_iface[niface] = s;
niface++;
}
}
// Get pointers to local DAQ mbuf
ifo = (char *)findSharedMemorySize(buffer_name,max_data_size_mb);
ifo_header = (daq_multi_cycle_header_t *)ifo;
......@@ -342,7 +366,8 @@ main(int argc, char **argv)
// Make 0MQ socket connections
for(ii=0;ii<nsys;ii++) {
// Create a thread to receive data from each data server
thread_index[ii] = ii;
thread_index[ii].index = ii;
thread_index[ii].src_iface = (niface ? local_iface[ii % niface]: 0);
pthread_create(&thread_id[ii],NULL,rcvr_thread,(void *)&thread_index[ii]);
}
......
......@@ -6,7 +6,7 @@
#include <zmq.h>
int dc_generate_connection_string(char *dest, const char *src, size_t dest_len)
int dc_generate_connection_string(char *dest, const char *src, size_t dest_len, const char* src_iface)
{
static const char *proto_sep = "://";
static const char *port_sep = ":";
......@@ -59,7 +59,8 @@ int dc_generate_connection_string(char *dest, const char *src, size_t dest_len)
if (port == 0) {
port = DAQ_DATA_PORT;
}
snprintf(dest, dest_len, "%s://%s:%d", proto, tmp_name, port);
snprintf(dest, dest_len, "%s://%s%s%s:%d", proto, (src_iface? src_iface : ""), (src_iface? ":0;" : ""), tmp_name, port);
return 1;
}
......
......@@ -12,11 +12,13 @@ extern "C" {
* @param dest Destination buffer of dest_len bytes
* @param src Input string
* @param dest_len Size of *dest
* @param src_iface Optional source interface name, used to identify which interface the connection
* should originate from.
* @return 0 on failure, 1 on success. On failure the contents of *dest are undefined.
* @note This looks for protocol, hostname, port of the form protocol://hostname:port
* and applies default values if no protocol or port is found.
*/
extern int dc_generate_connection_string(char *dest, const char *src, size_t dest_len);
extern int dc_generate_connection_string(char *dest, const char *src, size_t dest_len, const char* src_iface);
extern int dc_set_zmq_options(void *z_socket);
......
......@@ -97,7 +97,7 @@ void zmq_make_connection(char *eport)
daq_publisher = zmq_socket (daq_context,ZMQ_PUB);
// sprintf(loc,"%s%d","tcp://*:",DAQ_DATA_PORT);
//sprintf(loc,"%s%s%s%d","tcp://",eport,":",DAQ_DATA_PORT);
if (!dc_generate_connection_string(loc, eport, sizeof(loc))) {
if (!dc_generate_connection_string(loc, eport, sizeof(loc), 0)) {
fprintf(stderr, "Unable to generate connection string for '%s'\n", eport);
exit(1);
}
......
......@@ -124,7 +124,7 @@ void *rcvr_thread(void *arg) {
rc = zmq_setsockopt(zsocket, ZMQ_SUBSCRIBE, "", 0);
assert(rc == 0);
if (!dc_generate_connection_string(loc, sname[mt], sizeof(loc))) {
if (!dc_generate_connection_string(loc, sname[mt], sizeof(loc), 0)) {
fprintf(stderr, "Unable to parse endpoint name '%s'\n", sname[mt]);
exit(1);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment