Skip to content
Snippets Groups Projects
Commit b7b450ea authored by Rolf Bork's avatar Rolf Bork
Browse files

- Fixed rcv buff size error in zmq_daq.h.

- Started multi socket polling software in zmq_multi_rcvr.c. Still need to finish.


git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4338 6dcd42c9-f523-4c6d-aada-af552506706e
parent a88029da
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@ typedef struct daq_data_t_v1 {
//
#define DAQ_ZMQ_MODELS_PER_FE 6
#define DAQ_ZMQ_DCU_SIZE 0x1000000
#define DAQ_ZMQ_BLOCK_SIZE (DAQ_DCU_SIZE/DAQ_NUM_DATA_BLOCKS)
#define DAQ_ZMQ_BLOCK_SIZE (DAQ_ZMQ_DCU_SIZE/DAQ_NUM_DATA_BLOCKS)
#define DAQ_DATA_PORT 5555
#define DAQ_GDS_DATA_PORt 5556
//
......
......@@ -22,6 +22,7 @@
#include <zmq.h>
#include <assert.h>
#include "zmq_daq.h"
#include "../include/daqmap.h"
int do_verbose;
......@@ -40,8 +41,7 @@ usage()
}
unsigned int nsys; // The number of mapped shared memories (number of data sources)
static const int buf_size = DAQ_DCU_BLOCK_SIZE * 2;
unsigned int nsys = 1; // The number of mapped shared memories (number of data sources)
static const int header_size = sizeof(daq_msg_header_t) + 4;
int
......@@ -50,6 +50,7 @@ main(int argc, char **argv)
char *sysname;
char *sname[DCU_COUNT];
int c;
extern char *optarg;
......@@ -58,6 +59,7 @@ main(int argc, char **argv)
daq_multi_dcu_data_t mxDataBlock;
// Declare pointer to local memory message area
char *daqbuffer = (char *)&mxDataBlock;
printf("size of mxdata = %d\n",sizeof(mxDataBlock));
/* set up defaults */
......@@ -68,10 +70,12 @@ main(int argc, char **argv)
int ii;
// Declare 0MQ message pointers
void *daq_context;
void *daq_subscriber;
void *daq_context[DCU_COUNT];
void *daq_subscriber[DCU_COUNT];
int rc;
zmq_msg_t message;
zmq_pollitem_t daq_items[DCU_COUNT];
char loc[20];
// Test pointer to cpu meter data
int *cpu_meter;
......@@ -97,18 +101,38 @@ main(int argc, char **argv)
printf("Server name: %s\n", sysname);
sname[0] = strtok(sysname, " ");
for(;;) {
printf("%s\n", sname[nsys - 1]);
char *s = strtok(0, " ");
if (!s) break;
sname[nsys] = s;
nsys++;
}
// Make 0MQ socket connection
daq_context = zmq_ctx_new();
daq_subscriber = zmq_socket (daq_context,ZMQ_SUB);
char loc[20];
sprintf(loc,"%s%s%s%d","tcp://",sysname,":",DAQ_DATA_PORT);
rc = zmq_connect (daq_subscriber, loc);
assert (rc == 0);
// Subscribe to all data from the server
rc = zmq_setsockopt(daq_subscriber,ZMQ_SUBSCRIBE,"",0);
assert (rc == 0);
printf("nsys = %d\n",nsys);
for(ii=0;ii<nsys;ii++) {
printf("sys %d = %s\n",ii,sname[ii]);
}
// Make 0MQ socket connection
for(ii=0;ii<nsys;ii++) {
// Make 0MQ socket connection
daq_context[ii] = zmq_ctx_new();
daq_subscriber[ii] = zmq_socket (daq_context[ii],ZMQ_SUB);
sprintf(loc,"%s%s%s%d","tcp://",sname[ii],":",DAQ_DATA_PORT);
printf("sys %d = %s\n",ii,loc);
rc = zmq_connect (daq_subscriber[ii], loc);
assert (rc == 0);
// Subscribe to all data from the server
rc = zmq_setsockopt(daq_subscriber[ii],ZMQ_SUBSCRIBE,"",0);
assert (rc == 0);
daq_items[ii].socket = daq_subscriber[ii];
daq_items[ii].fd = 0;
daq_items[ii].events = ZMQ_POLLIN;
daq_items[ii].revents = 0;
}
// Receive DAQ data in an infinite loop ***********************************
......@@ -116,15 +140,20 @@ main(int argc, char **argv)
do {
// Initialize 0MQ message buffer
zmq_msg_init(&message);
// Get data when message size > 0
size = zmq_msg_recv(&message,daq_subscriber,0);
assert(size >= 0);
// Get pointer to message data
char *string = (char *)zmq_msg_data(&message);
// Copy data out of 0MQ message buffer to local memory buffer
memcpy(daqbuffer,string,size);
// Destroy the received message buffer
zmq_msg_close(&message);
zmq_poll(daq_items,nsys,-1);
for(ii=0;ii<nsys;ii++) {
if(daq_items[ii].revents & ZMQ_POLLIN) {
// Get data when message size > 0
size = zmq_msg_recv(&message,daq_subscriber[ii],0);
assert(size >= 0);
// Get pointer to message data
char *string = (char *)zmq_msg_data(&message);
// Copy data out of 0MQ message buffer to local memory buffer
memcpy(daqbuffer,string,size);
// Destroy the received message buffer
zmq_msg_close(&message);
}
}
// *******************************************************************
// Following is test finding cpu meter data
// Set data pointer to start of received data block
......@@ -152,11 +181,14 @@ main(int argc, char **argv)
}
}
// *******************************************************************
myErrorSignal ++;
}while(!myErrorSignal);
}while(myErrorSignal < 320);
zmq_close(daq_subscriber);
zmq_ctx_destroy(daq_context);
for(ii=0;ii<nsys;ii++) {
zmq_close(daq_subscriber[ii]);
zmq_ctx_destroy(daq_context[ii]);
}
exit(0);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment