Skip to content
Snippets Groups Projects
Commit 2178ad18 authored by Rolf Bork's avatar Rolf Bork
Browse files

zmq_proxy and zmq_proxy_client now functional for testing.

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4354 6dcd42c9-f523-4c6d-aada-af552506706e
parent bea04208
No related branches found
No related tags found
No related merge requests found
......@@ -78,5 +78,17 @@ typedef struct gds_multi_dcu_data_t {
char zmqDataBlock[DAQ_ZMQ_FE_DATA_BLOCK_SIZE];
}gds_multi_dcu_data_t;
typedef struct channel_t {
char name[64];
int type;
int datarate;
int datasize;
}channel_t;
typedef struct nds_data_t {
channel_t ndschan;
char ndsdata[10000];
} nds_data_t;
#define DAQ_ZMQ_HEADER_SIZE (sizeof(daq_msg_header_t) * DAQ_ZMQ_MAX_DCU + sizeof(int))
//
/// @file zmq_multi_rcvr.c
/// @brief Test DAQ data receiver using ZeroMQ.
//
#include <unistd.h>
#include <ctype.h>
#include <sys/time.h>
......@@ -23,56 +18,27 @@
#include <assert.h>
#include "zmq_daq.h"
int do_verbose;
unsigned int do_wait = 0; // Wait for this number of milliseconds before starting a cycle
extern void *findSharedMemory(char *);
void
usage()
{
fprintf(stderr, "Usage: zmq_multi_rcvr [args] -s server name\n");
fprintf(stderr, "-l filename - log file name\n");
fprintf(stderr, "-s - server name eg x1lsc0, x1susex, etc.\n");
fprintf(stderr, "-v - verbose prints cpu_meter test data\n");
fprintf(stderr, "-h - help\n");
static volatile int keepRunning = 1;
void intHandler(int dummy) {
keepRunning = 0;
}
unsigned int nsys; // The number of mapped shared memories (number of data sources)
static const int buf_size = DAQ_DCU_BLOCK_SIZE * 2;
static const int header_size = sizeof(daq_msg_header_t) + 4;
typedef struct channel_t {
char name[64];
int type;
int value;
}channel_t;
int
main(int argc, char **argv)
int readinifile(char *filename,channel_t ndsdata[])
{
char *sysname;
int c;
extern char *optarg;
channel_t ndsdata[2000];
channel_t *ndsptr;
int lft = 0;
int ii;
FILE *fr;
char line[80];
int ii = 0;
int totalchans = 0;
char tmpname[64];
int tmpdatatype = 0;
char tmpname[60];
int tmpdatarate = 0;
int lft = 0;
int totalchans = 0;
int totalrate = 0;
int epicstotal = 0;
fr = fopen("/opt/rtcds/tst/x1/chans/daq/X1ATS.ini","r");
fr = fopen(filename,"r");
if(fr == NULL) return(-1);
while(fgets(line,80,fr) != NULL) {
if(strstr(line,"X1") != NULL && strstr(line,"#") == NULL) {
if(strstr(line,"X2") != NULL && strstr(line,"#") == NULL) {
int sl = strlen(line) - 2;
memmove(line, line+1, sl);
line[sl-1] = 0;
......@@ -81,144 +47,178 @@ main(int argc, char **argv)
lft = 1;
}
if(strstr(line,"datarate") != NULL && strstr(line,"#") == NULL && lft) {
if(strstr(line,"16") != NULL) tmpdatarate = 16;
else lft = 0;
if(strstr(line,"16") != NULL && strstr(line,"16384") == NULL) {
tmpdatarate = 16;
epicstotal ++;
}
if(strstr(line,"64") != NULL) tmpdatarate = 64;
if(strstr(line,"128") != NULL) tmpdatarate = 128;
if(strstr(line,"256") != NULL) tmpdatarate = 256;
if(strstr(line,"512") != NULL) tmpdatarate = 512;
if(strstr(line,"1024") != NULL) tmpdatarate = 1024;
if(strstr(line,"2048") != NULL) tmpdatarate = 2048;
if(strstr(line,"4096") != NULL) tmpdatarate = 4096;
if(strstr(line,"8192") != NULL) tmpdatarate = 8192;
if(strstr(line,"16384") != NULL) tmpdatarate = 16384;
if(strstr(line,"32768") != NULL) tmpdatarate = 32768;
if(strstr(line,"65536") != NULL) tmpdatarate = 65536;
}
if(strstr(line,"datatype") != NULL && strstr(line,"#") == NULL && lft) {
if(strstr(line,"2") != NULL) {
sprintf(ndsdata[totalchans].name,"%s",tmpname);
ndsdata[totalchans].type = 2;
totalchans ++;
ndsdata[totalchans].datasize = (4 * tmpdatarate) /16;;
totalrate += (4 * tmpdatarate);
}
if(strstr(line,"4") != NULL) {
ndsdata[totalchans].type = 4;
ndsdata[totalchans].datasize = (4 * tmpdatarate) /16;;
totalrate += (4 * tmpdatarate);
}
if(strstr(line,"5") != NULL) {
ndsdata[totalchans].type = 5;
ndsdata[totalchans].datasize = (8 * tmpdatarate) /16;;
totalrate += (8 * tmpdatarate);
}
sprintf(ndsdata[totalchans].name,"%s",tmpname);
ndsdata[totalchans].datarate = tmpdatarate;
totalchans ++;
lft = 0;
}
}
printf("Total chans = %d\n",totalchans);
for(ii = 0;ii<totalchans;ii++)
printf("%s\t%d\n",ndsdata[ii].name,ndsdata[ii].type);
// Create DAQ message area in local memory
daq_multi_dcu_data_t mxDataBlock;
// Declare pointer to local memory message area
char *daqbuffer = (char *)&mxDataBlock;
char ndsbuffer[1000];
/* set up defaults */
sysname = NULL;
int myErrorSignal = 0;
int size;
char *dataPtr;
int jj;
// Declare 0MQ message pointers
void *daq_context;
void *daq_subscriber;
void *nds_context;
void *nds_publisher;
int rc;
zmq_msg_t message;
// Test pointer to cpu meter data
int *cpu_meter;
int mycpu[6];
while ((c = getopt(argc, argv, "hd:s:l:Vvw:x")) != EOF) switch(c) {
case 's':
sysname = optarg;
break;
case 'v':
do_verbose = 1;
break;
case 'w':
do_wait = atoi(optarg);
break;
case 'h':
default:
usage();
exit(1);
}
if (sysname == NULL) { usage(); exit(1); }
printf("Server name: %s\n", sysname);
printf("%s\t%d\t%d\t%d\n",ndsdata[ii].name,ndsdata[ii].type,ndsdata[ii].datarate,
ndsdata[ii].datasize);
int fastchans = totalchans - epicstotal;
printf("Total chans = %d\nTotal Epics = %d\nTotalrate = %d\n",fastchans,epicstotal,totalrate);
return(totalchans);
}
void
usage()
{
fprintf(stderr, "Usage: zmq_multi_rcvr [args] -s server name\n");
fprintf(stderr, "-l filename - log file name\n");
fprintf(stderr, "-s - server name eg x1lsc0, x1susex, etc.\n");
fprintf(stderr, "-v - verbose prints cpu_meter test data\n");
fprintf(stderr, "-h - help\n");
}
int main(int argc, char **argv)
{
// Make 0MQ socket connection
daq_context = zmq_ctx_new();
daq_subscriber = zmq_socket (daq_context,ZMQ_SUB);
char loc[20];
sprintf(loc,"%s%s%s%d","tcp://",sysname,":",DAQ_DATA_PORT);
rc = zmq_connect (daq_subscriber, loc);
assert (rc == 0);
// Subscribe to all data from the server
rc = zmq_setsockopt(daq_subscriber,ZMQ_SUBSCRIBE,"",0);
assert (rc == 0);
printf("Rcv data on %s\n",loc);
char *sysname;
char *modname;
extern char *optarg;
sysname = NULL;
modname = NULL;
channel_t mydata[8000];
int c;
int ii;
char filename[256];
char basedir[128];
int num_chans = 0;
daq_multi_dcu_data_t mxDataBlock;
char *daqbuffer = (char *)&mxDataBlock;
char msgbuffer[10000];
nds_data_t ndsbuffer;
char *ndsptr = (char *) &ndsbuffer;
void *daq_context;
void *daq_subscriber;
void *nds_context;
void *nds_publisher;
int rc;
int size;
zmq_msg_t message;
sprintf(basedir,"%s","/opt/rtcds/tst/x2/chans/daq/");
while ((c = getopt(argc, argv, "hd:s:l:d:Vvw:x")) != EOF) switch(c) {
case 's':
sysname = optarg;
printf("sysname = %s\n",sysname);
break;
case 'd':
modname = optarg;
printf("modname = %s\n",modname);
break;
case 'h':
default:
usage();
exit(1);
}
if (sysname == NULL || modname == NULL) { usage(); exit(1); }
for(ii=0;modname[ii] != '\0';ii++) {
if(islower(modname[ii])) modname[ii] = toupper(modname[ii]);
}
signal(SIGINT,intHandler);
sprintf(filename,"%s%s%s",basedir,modname,".ini");
printf("reading %s\n",filename);
num_chans = readinifile(filename,mydata);
// Set up to rcv
daq_context = zmq_ctx_new();
daq_subscriber = zmq_socket (daq_context,ZMQ_SUB);
char loc[32];
sprintf(loc,"%s%s%s%d","tcp://",sysname,":",DAQ_DATA_PORT);
rc = zmq_connect (daq_subscriber, loc);
assert (rc == 0);
rc = zmq_setsockopt(daq_subscriber,ZMQ_SUBSCRIBE,"",0);
assert (rc == 0);
printf("Rcv data on %s\n",loc);
nds_context = zmq_ctx_new();
nds_publisher = zmq_socket (nds_context,ZMQ_PUB);
sprintf(loc,"%s","tcp://*:6666");
rc = zmq_bind (nds_publisher,loc);
assert (rc == 0);
printf("send data on %s\n",loc);
nds_publisher = zmq_socket (nds_context,ZMQ_PUB);
sprintf(loc,"%s%d","tcp://eth2:",DAQ_DATA_PORT);
rc = zmq_bind (nds_publisher,loc);
assert (rc == 0);
printf("send data on %s\n",loc);
// Receive DAQ data in an infinite loop ***********************************
do {
// Initialize 0MQ message buffer
zmq_msg_init(&message);
// Get data when message size > 0
size = zmq_msg_recv(&message,daq_subscriber,0);
assert(size >= 0);
// Get pointer to message data
char *string = (char *)zmq_msg_data(&message);
// Copy data out of 0MQ message buffer to local memory buffer
memcpy(daqbuffer,string,size);
// Destroy the received message buffer
zmq_msg_close(&message);
// *******************************************************************
// Following is test finding cpu meter data
// Set data pointer to start of received data block
if(do_verbose) {
dataPtr = (char *)&mxDataBlock.zmqDataBlock[0];;
for(ii=0;ii<mxDataBlock.dcuTotalModels;ii++) {
// Increment data pointer to start of next FE data block
if(ii>0) dataPtr += mxDataBlock.zmqheader[ii-1].dataBlockSize;
// Extract the cpu meter data for each FE
cpu_meter = (int *)dataPtr;
for(jj=0;jj<10;jj++) {
ndsdata[jj].value = *cpu_meter;
cpu_meter ++;
}
}
// Print the CPU METER info on each 1 second mark
if(mxDataBlock.zmqheader[0].cycle == 0) {
ndsptr = &ndsdata[0];
int xsize = sizeof(channel_t);
for(jj=0;jj<totalchans;jj++) {
#if 0
sprintf(ndsbuffer,"%s %d %d",
ndsdata[jj].name,
ndsdata[jj].type,
ndsdata[jj].value);
#endif
memcpy(ndsbuffer,(void *)ndsptr,xsize);
zmq_send(nds_publisher,ndsbuffer,xsize,0);
ndsptr ++;
}
size = zmq_msg_recv(&message,daq_subscriber,0);
if(size >= 0) {
// Get pointer to message data
char *string = (char *)zmq_msg_data(&message);
// Copy data out of 0MQ message buffer to local memory buffer
memcpy(daqbuffer,string,size);
// Destroy the received message buffer
zmq_msg_close(&message);
// printf("message rcvd\n");
char *dptr = (char *)&mxDataBlock.zmqDataBlock[0];
for(ii=0;ii<num_chans;ii++) {
sprintf(ndsbuffer.ndschan.name,"%s",mydata[ii].name);
ndsbuffer.ndschan.type = mydata[ii].type;
ndsbuffer.ndschan.datarate = mydata[ii].datarate;
ndsbuffer.ndschan.datasize = mydata[ii].datasize;
char *ndptr = (char *)&ndsbuffer.ndsdata[0];
memcpy(ndptr,dptr,mydata[ii].datasize);
dptr += mydata[ii].datasize;
int xsize = sizeof(channel_t) + mydata[ii].datasize;
memcpy(msgbuffer,ndsptr,xsize);
zmq_send(nds_publisher,msgbuffer,xsize,0);
}
}
// *******************************************************************
}while(!myErrorSignal);
}while(keepRunning);
printf("closing out zmq\n");
zmq_close(daq_subscriber);
zmq_ctx_destroy(daq_context);
zmq_close(nds_publisher);
zmq_ctx_destroy(nds_context);
exit(0);
zmq_ctx_destroy(daq_context);
zmq_close(nds_publisher);
zmq_ctx_destroy(nds_context);
return(0);
}
......@@ -15,28 +15,24 @@
#include <signal.h>
#include <zmq.h>
#include <assert.h>
typedef struct channel_t {
char name[64];
int type;
int value;
}channel_t;
#include "zmq_daq.h"
int main (int argc, char *argv [])
{
channel_t ndschannel;
channel_t *ndsptr = &ndschannel;
nds_data_t ndschannel;
char *ndsptr = (char *)&ndschannel;
zmq_msg_t message;
int ii;
char loc[32];
char chnames[2][32] = {"X1:ATS-CPU_METER","X1:ATS-TIME_DIAG"};
printf ("Collecting updates from NDS proxy\n");
sprintf(loc,"%s%d","tcp://x2daqdc0-out:",DAQ_DATA_PORT);
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://scipe19:6666");
int rc = zmq_connect (subscriber, loc);
assert (rc == 0);
printf ("Collecting updates from NDS proxy %s\n",loc);
for(ii=1;ii<argc;ii++) {
char *filter = argv [ii];
......@@ -58,7 +54,11 @@ printf ("Collecting updates from NDS proxy\n");
name, &datatype, &datavalue);
#endif
zmq_msg_close(&message);
printf("Name = %s\ttype = %d\tValue = %d\n",ndschannel.name,ndschannel.type,ndschannel.value);
if(ndschannel.ndschan.type == 2) {
int *idata = (int *)&ndschannel.ndsdata[0];
printf("Name = %s\t",ndschannel.ndschan.name);
printf("data = \t%d\n",*idata);
}
}
zmq_close (subscriber);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment