Skip to content
Snippets Groups Projects
Commit c41afd6a authored by Rolf Bork's avatar Rolf Bork
Browse files

Minor updates to zmq_threads to prevent segfault when program exits.

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4343 6dcd42c9-f523-4c6d-aada-af552506706e
parent ca463385
No related branches found
No related tags found
No related merge requests found
......@@ -8,19 +8,28 @@ Files:
- Usage: zmq_multi_stream -s systems, where:
- systems are space delimited names of all models running on the FE computer
eg -s "x2ioplsc0 x2lsc x2omc x2lscaux"
- Status: Works fine if all models running on the FE
- Needs plenty of work to handle various fault cases eg not all models running, etc.
- zmq_threads.c:
- Purpose: Receive DAQ data from FE computers. Code uses a thread to connect to each FE computer DAQ net.
- Usage: zmq_threads -s computer_names, where:
- computer_names = hostnames corresponding to FE computer DAQ LAN connection.
e.g. -s "x2lsc0_daq x2oaf0_daq"
- Status: Functional to point of being able to rcv data and place into larger DC block.
- Again, needs plenty of work to handle rcv fault conditions.
- zmq_multi_rcvr:
- Purpose: Single threaded version of zmq_threads.c
- Status: Not functional, as work continued with zmq_threads instead.
- zmq_proxy.c
- Purpose: Receive data from DAQ LAN and retranmit data in channel by channel blocks.
- Status: Not functional. Only used as a test case on Caltech DTS for proof of principal.
- Easiest way to make usable and generic is to now start with zmq_threads and
change out DC code with channel by channel xmission code.
- zmq_proxy_client.c
- Purpose: Subscribe to live data streams of single or multiple DAQ channels by name
from the zmq_proxy.
- Status: Not functional. Only used as a test case on Caltech DTS for proof of principal.
......@@ -24,8 +24,10 @@ typedef struct daq_data_t_v1 {
} daq_data_t_v1;
//
#define DAQ_ZMQ_MODELS_PER_FE 6
#define DAQ_ZMQ_DCU_SIZE 0x1000000
#define DAQ_ZMQ_MAX_BYTE_SEC 0x6000000 // 100MB per sec
#define DAQ_ZMQ_DCU_SIZE 0x1000000 // 16MB per sec from FE computer
#define DAQ_ZMQ_BLOCK_SIZE (DAQ_ZMQ_DCU_SIZE/DAQ_NUM_DATA_BLOCKS)
#define DAQ_ZMQ_DC_BLOCK_SIZE (DAQ_ZMQ_MAX_BYTE_SEC/DAQ_NUM_DATA_BLOCKS)
#define DAQ_DATA_PORT 5555
#define DAQ_GDS_DATA_PORT 5556
//
......@@ -73,5 +75,5 @@ typedef struct gds_multi_dcu_data_t {
typedef struct daq_dc_data_t {
int dcuTotalModels;
daq_msg_header_t zmqheader[128];
char zmqDataBlock[DAQ_ZMQ_DCU_SIZE];
char zmqDataBlock[DAQ_ZMQ_DC_BLOCK_SIZE];
}daq_dc_data_t;
......@@ -32,7 +32,8 @@ unsigned int do_wait = 0; // Wait for this number of milliseconds before startin
extern void *findSharedMemory(char *);
void *daq_context[DCU_COUNT];
void *daq_subscriber[DCU_COUNT];
daq_multi_dcu_data_t mxDataBlockFull[16];
daq_dc_data_t mxDataBlockFull[16];
daq_multi_dcu_data_t mxDataBlockG[32][16];
unsigned int tstatus[16];
int stop_working_threads = 0;
int start_acq = 0;
......@@ -60,11 +61,10 @@ void *rcvr_thread(void *arg) {
int mt = *mythread;
printf("myarg = %d\n",mt);
zmq_msg_t message;
daq_multi_dcu_data_t mxDataBlock;
int ii;
int cycle = 0;
int acquire = 0;
unsigned int myts = 0;
daq_multi_dcu_data_t mxDataBlock;
do {
zmq_msg_init(&message);
......@@ -79,24 +79,21 @@ void *rcvr_thread(void *arg) {
// Destroy the received message buffer
zmq_msg_close(&message);
for (ii = 0;ii<mxDataBlock.dcuTotalModels;ii++) {
int tdd = mxDataBlock.zmqheader[ii].dcuId;
cycle = mxDataBlock.zmqheader[ii].cycle;
myts = mxDataBlock.zmqheader[ii].timeSec;
mxDataBlockFull[cycle].zmqheader[tdd].dcuId = tdd;
mxDataBlockFull[cycle].zmqheader[tdd].fileCrc = mxDataBlock.zmqheader[ii].fileCrc;
mxDataBlockFull[cycle].zmqheader[tdd].status = mxDataBlock.zmqheader[ii].status;
mxDataBlockFull[cycle].zmqheader[tdd].cycle = mxDataBlock.zmqheader[ii].cycle;
mxDataBlockFull[cycle].zmqheader[tdd].timeSec = mxDataBlock.zmqheader[ii].timeSec;
mxDataBlockFull[cycle].zmqheader[tdd].timeNSec = mxDataBlock.zmqheader[ii].timeNSec;
mxDataBlockFull[cycle].zmqheader[tdd].dataCrc = mxDataBlock.zmqheader[ii].dataCrc;
mxDataBlockFull[cycle].zmqheader[tdd].dataBlockSize = mxDataBlock.zmqheader[ii].dataBlockSize;
// Copy data to global buffer
char *localbuff = (char *)&mxDataBlockG[mt][cycle];
memcpy(localbuff,daqbuffer,size);
}
// Always start on cycle 0 after told to start by main thread
if(cycle == 0 && start_acq) acquire = 1;
// Set the cycle data ready bit
if(acquire) {
tstatus[cycle] |= (1 << mt);
printf("c = %d\t%d\t%d\t%d\n",mt,myts,cycle,size);
}
// Run until told to stop by main thread
} while(!stop_working_threads);
printf("Stopping thread %d\n",mt);
usleep(200000);
return(0);
}
......@@ -117,28 +114,19 @@ main(int argc, char **argv)
// Create DAQ message area in local memory
daq_multi_dcu_data_t mxDataBlock;
daq_multi_dcu_data_t mxDataBlocktest[4];
// Declare pointer to local memory message area
printf("size of mxdata = %d\n",sizeof(mxDataBlock));
printf("size of mxdata = %ld\n",sizeof(mxDataBlock));
/* set up defaults */
sysname = NULL;
int myErrorSignal = 0;
int size;
int ii;
int mydatardy = 0;
// Declare 0MQ message pointers
int rc;
zmq_msg_t message;
zmq_pollitem_t daq_items[DCU_COUNT];
char loc[30];
char loc[40];
// Test pointer to cpu meter data
int *cpu_meter;
int mycpu[4][6];
int myconnects = 0;
while ((c = getopt(argc, argv, "hd:s:l:Vvw:x")) != EOF) switch(c) {
case 's':
......@@ -191,84 +179,14 @@ main(int argc, char **argv)
dataRdy |= (1 << ii);
}
#if 0
// Receive DAQ data in an infinite loop ***********************************
zmq_msg_init(&message);
do {
// Initialize 0MQ message buffer
// zmq_msg_init(&message);
for(ii=0;ii<nsys;ii++) {
zmq_poll(daq_items,nsys,-1);
if(daq_items[ii].revents & ZMQ_POLLIN) {
// Get data when message size > 0
size = zmq_msg_recv(&message,daq_subscriber[ii],0);
assert(size >= 0);
// Get pointer to message data
char *string = (char *)zmq_msg_data(&message);
char *daqbuffer = (char *)&mxDataBlocktest[ii];
// Copy data out of 0MQ message buffer to local memory buffer
memcpy(daqbuffer,string,size);
// Destroy the received message buffer
zmq_msg_close(&message);
if (ii==0) myconnects |= 1;
if (ii==1) myconnects |= 2;
zmq_msg_init(&message);
}
}
// *******************************************************************
// Following is test finding cpu meter data
// Set data pointer to start of received data block
if(do_verbose) {
int mytotaldcu = 0;
int mytotaldata = 0;
for(int jj=0;jj<3;jj++) {
char *dataPtr = (char *)&mxDataBlocktest[jj].zmqDataBlock[0];;
mytotaldcu += mxDataBlocktest[jj].dcuTotalModels;
for(ii=0;ii<mxDataBlocktest[jj].dcuTotalModels;ii++) {
// Increment data pointer to start of next FE data block
if(ii>0) dataPtr += mxDataBlocktest[jj].zmqheader[ii-1].dataBlockSize;
mytotaldata += mxDataBlocktest[jj].zmqheader[ii].dataBlockSize;
// Extract the cpu meter data for each FE
cpu_meter = (int *)dataPtr;
cpu_meter += 2;
mycpu[jj][ii] = *cpu_meter;
}
if(mxDataBlocktest[jj].zmqheader[0].cycle == 0) mydatardy |= (1 << jj);
}
// Print the CPU METER info on each 1 second mark
// if(mxDataBlocktest[0].zmqheader[0].cycle == 0) {
if(mydatardy == 7) {
printf("Total DCU = %d %d totalsize = %d\n", mytotaldcu,myconnects,mytotaldata);
for(int jj=0;jj<3;jj++) {
for(ii=0;ii<mxDataBlocktest[jj].dcuTotalModels;ii++) {
printf("DCU = %d\tCPU METER = \t%d",mxDataBlocktest[jj].zmqheader[ii].dcuId,mycpu[jj][ii]);
printf("\tTime = %d %d\tSize = \t%d bytes\n",
mxDataBlocktest[jj].zmqheader[ii].timeSec,
mxDataBlocktest[jj].zmqheader[ii].timeNSec,
mxDataBlocktest[jj].zmqheader[ii].dataBlockSize);
}
}
}
// if(mxDataBlocktest[0].zmqheader[0].cycle == 0) {
if(mydatardy == 7) {
mydatardy = 0;
printf("\n*******************************************************\n");
}
}
// *******************************************************************
myErrorSignal ++;
}while(myErrorSignal < 640);
#endif
int mastercycle = 0;
int loop = 0;
start_acq = 1;
int64_t mytime = 0;
int64_t mylasttime = 0;
int64_t myptime = 0;
int mytotaldcu = 0;
char *zbuffer;
int mytdbs = 0;
do {
do {
usleep(2000);
......@@ -278,17 +196,47 @@ main(int argc, char **argv)
myptime = mytime - mylasttime;
mylasttime = mytime;
printf("Data rday for cycle = %d\t%ld\n",loop,myptime);
mytotaldcu = 0;
zbuffer = (char *)&mxDataBlockFull[loop].zmqDataBlock[0];
mytdbs = 0;
for(ii=0;ii<nsys;ii++) {
int myc = mxDataBlockG[ii][loop].dcuTotalModels;
// printf("\tModel %d = %d\n",ii,myc);
for(int jj=0;jj<myc;jj++) {
mxDataBlockFull[loop].zmqheader[mytotaldcu].dcuId = mxDataBlockG[ii][loop].zmqheader[jj].dcuId;
mxDataBlockFull[loop].zmqheader[mytotaldcu].fileCrc = mxDataBlockG[ii][loop].zmqheader[jj].fileCrc;
mxDataBlockFull[loop].zmqheader[mytotaldcu].status = mxDataBlockG[ii][loop].zmqheader[jj].status;
mxDataBlockFull[loop].zmqheader[mytotaldcu].cycle = mxDataBlockG[ii][loop].zmqheader[jj].cycle;
mxDataBlockFull[loop].zmqheader[mytotaldcu].timeSec = mxDataBlockG[ii][loop].zmqheader[jj].timeSec;
mxDataBlockFull[loop].zmqheader[mytotaldcu].timeNSec = mxDataBlockG[ii][loop].zmqheader[jj].timeNSec;
int mydbs = mxDataBlockG[ii][loop].zmqheader[jj].dataBlockSize;
// printf("\t\tdcuid = %d\n",mydbs);
mxDataBlockFull[loop].zmqheader[mytotaldcu].dataBlockSize = mydbs;
char *mbuffer = (char *)&mxDataBlockG[ii][loop].zmqDataBlock[0];
memcpy(zbuffer,mbuffer,mydbs);
zbuffer += mydbs;
mytdbs += mydbs;
mytotaldcu ++;
}
}
printf("\tTotal DCU = %d\tSize = %d\n",mytotaldcu,mytdbs);
loop ++;
loop %= 16;
myErrorSignal ++;
}while (myErrorSignal < 132);
printf("stopping threads %d \n",nsys);
stop_working_threads = 1;
#if 0
for(ii=0;ii<nsys;ii++) {
pthread_join(thread_id[ii],NULL);
rc = pthread_join(thread_id[ii],NULL);
if(rc != 0) printf("thread join fail %d %d\n",ii,rc);
}
#endif
sleep(2);
printf("closing out zmq\n");
for(ii=0;ii<nsys;ii++) {
zmq_close(daq_subscriber[ii]);
zmq_ctx_destroy(daq_context[ii]);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment