Skip to content
Snippets Groups Projects
Commit 1d0805bd authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Merge changes to remove local_dc functionality from omx_xmit

parents 1f6c08f7 1e04d696
No related branches found
No related tags found
No related merge requests found
...@@ -4,24 +4,16 @@ ...@@ -4,24 +4,16 @@
//// ////
// //
#include <ctype.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <malloc.h>
#include <signal.h> #include <signal.h>
#include <unistd.h> #include <unistd.h>
#include <sys/ioctl.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h> #include <stdlib.h>
#include "../drv/crc.c" #include "../drv/crc.c"
#include "../include/daqmap.h"
#include "../include/drv/fb.h"
#include "../include/daq_core.h" #include "../include/daq_core.h"
#include "../drv/gpstime/gpstime.h"
#include "myriexpress.h" #include "myriexpress.h"
#include "mx_extensions.h" #include "mx_extensions.h"
#include <pthread.h> #include <pthread.h>
...@@ -46,7 +38,7 @@ MX_MUTEX_T stream_mutex; ...@@ -46,7 +38,7 @@ MX_MUTEX_T stream_mutex;
#define DFLT_ITER 1000 #define DFLT_ITER 1000
#define NUM_RREQ 16 /* currently constrained by MX_MCP_RDMA_HANDLES_CNT*/ #define NUM_RREQ 16 /* currently constrained by MX_MCP_RDMA_HANDLES_CNT*/
#define NUM_SREQ 256 /* currently constrained by MX_MCP_RDMA_HANDLES_CNT*/ #define NUM_SREQ 256 /* currently constrained by MX_MCP_RDMA_HANDLES_CNT*/
#define MSG_BUF_SIZE 0x200000 #define MSG_BUF_SIZE sizeof(daq_dc_data_t)
#define DO_HANDSHAKE 0 #define DO_HANDSHAKE 0
#define MATCH_VAL_MAIN (1 << 31) #define MATCH_VAL_MAIN (1 << 31)
...@@ -54,9 +46,6 @@ MX_MUTEX_T stream_mutex; ...@@ -54,9 +46,6 @@ MX_MUTEX_T stream_mutex;
#define __CDECL #define __CDECL
static struct rmIpcStr *shmIpcPtr[128];
static char *shmDataPtr[128];
static struct cdsDaqNetGdsTpNum *shmTpTable[128];
static const int header_size = sizeof(struct daq_multi_dcu_header_t); static const int header_size = sizeof(struct daq_multi_dcu_header_t);
static const int buf_size = DAQ_DCU_BLOCK_SIZE * 2; static const int buf_size = DAQ_DCU_BLOCK_SIZE * 2;
int modelrates[DAQ_TRANSIT_MAX_DCU]; int modelrates[DAQ_TRANSIT_MAX_DCU];
...@@ -68,17 +57,14 @@ char *zbuffer; ...@@ -68,17 +57,14 @@ char *zbuffer;
extern void *findSharedMemory(char *); extern void *findSharedMemory(char *);
extern void *findSharedMemorySize(char *,int); extern void *findSharedMemorySize(char *,int);
char modelnames[DAQ_TRANSIT_MAX_DCU][64];
char *sysname;
int do_verbose = 0; int do_verbose = 0;
static volatile int keepRunning = 1; static volatile int keepRunning = 1;
char *ifo; char *ifo = 0;
char *ifo_data; char *ifo_data = 0;
size_t cycle_data_size; size_t cycle_data_size = 0;
char msg_buffer[MSG_BUF_SIZE]; char msg_buffer[MSG_BUF_SIZE];
int symmetricom_fd = -1;
int daqStatBit[2]; int daqStatBit[2];
...@@ -89,10 +75,9 @@ int daqStatBit[2]; ...@@ -89,10 +75,9 @@ int daqStatBit[2];
void Usage() void Usage()
{ {
fprintf(stderr,"Usage of mx_fe:\n"); fprintf(stderr,"Usage of omx_xmit:\n");
fprintf(stderr,"mx_fe -s <models> <OPTIONS>\n"); fprintf(stderr,"mx_fe -s <models> <OPTIONS>\n");
fprintf(stderr," -b <buffer> : Name of the mbuf to concentrate the data to locally (defaults to ifo)\n"); fprintf(stderr," -b <buffer> : Name of the mbuf to read local data from (defaults to local_dc)\n");
fprintf(stderr," -s <value> : Name of FE control models\n");
fprintf(stderr," -m <value> : Local memory buffer size in megabytes\n"); fprintf(stderr," -m <value> : Local memory buffer size in megabytes\n");
fprintf(stderr," -l <filename> : log file name\n"); fprintf(stderr," -l <filename> : log file name\n");
fprintf(stderr," -v 1 : Enable verbose output\n"); fprintf(stderr," -v 1 : Enable verbose output\n");
...@@ -106,78 +91,40 @@ void Usage() ...@@ -106,78 +91,40 @@ void Usage()
fprintf(stderr,"\n"); fprintf(stderr,"\n");
} }
// ********************************************************************************************** /**
/// Get current GPS time from the symmetricom IRIG-B card * @brief Set the cycle counter to an invalid value.
unsigned long * @param header pointer to the input block header
symm_gps_time(unsigned long *frac, int *stt) { * @note Used to force a resync of the counter.
unsigned long t[3]; */
void
ioctl (symmetricom_fd, IOCTL_SYMMETRICOM_TIME, &t); reset_cycle_counter( volatile daq_multi_cycle_header_t* header )
t[1] *= 1000; {
t[1] += t[2]; header->curCycle = 0x50505050;
if (frac) *frac = t[1];
if (stt) *stt = 0;
return t[0];
} }
// ******************************************************************************* /**
/// See if the GPS card is locked. * @brief wait until the data in the input shared mem buffer has the
* @param header pointer to the input block header
* requested cycle counter.
*
* @returns non-zero if it we timeout
*/
int int
symm_ok() { wait_for_cycle( volatile daq_multi_cycle_header_t* header,
unsigned long req = 0; unsigned int requested_cycle )
ioctl (symmetricom_fd, IOCTL_SYMMETRICOM_STATUS, &req);
fprintf(stderr,"Symmetricom status: %s\n", req? "LOCKED": "UNCLOCKED");
return req;
}
// *******************************************************************************
// Wait for data ready from FE models
// *******************************************************************************
int
waitNextCycle2( int nsys,
unsigned int cyclereq, // Cycle to wait for
int reset, // Request to reset model ipc shared memory
int dataRdy[],
struct rmIpcStr *ipcPtr[]) // Pointer to IOP IPC shared memory
{ {
int iopRunning = 0; int timeout = 0;
int ii;
int threads_rdy = 0;
int timeout = 0;
// if reset, want to set all models cycle counters to impossible number
// this takes care of uninitialized or stopped models
if (reset)
{
for (ii = 0; ii < nsys; ++ii)
{
ipcPtr[ii]->cycle = 50;
}
}
usleep(1000);
// Wait until received data from at least 1 FE or timeout
do {
usleep(2000);
if(ipcPtr[0]->cycle == cyclereq)
{
iopRunning = 1;
dataRdy[0] = 1;
}
timeout += 1;
}while(!iopRunning && timeout < 500);
// Wait until data received from everyone or timeout do
timeout = 0; {
do { usleep( 2000 );
usleep(100); if ( header->curCycle == requested_cycle )
for(ii=1;ii<nsys;ii++) { {
if(ipcPtr[ii]->cycle == cyclereq && !dataRdy[ii]) threads_rdy ++; return 0;
if(ipcPtr[ii]->cycle == cyclereq) dataRdy[ii] = 1;
} }
timeout += 1; ++timeout;
}while(threads_rdy < nsys && timeout < 20); } while ( timeout < 500 );
return 1;
return(iopRunning);
} }
// ********************************************************************************************** // **********************************************************************************************
...@@ -192,9 +139,9 @@ void print_diags(int nsys, int lastCycle, int sendLength, daq_multi_dcu_data_t * ...@@ -192,9 +139,9 @@ void print_diags(int nsys, int lastCycle, int sendLength, daq_multi_dcu_data_t *
unsigned long sym_gps_sec = 0; unsigned long sym_gps_sec = 0;
unsigned long sym_gps_nsec = 0; unsigned long sym_gps_nsec = 0;
sym_gps_sec = symm_gps_time(&sym_gps_nsec, 0);
// Print diags in verbose mode // Print diags in verbose mode
fprintf(stderr,"\nTime = %d-%d size = %d\n",shmIpcPtr[0]->bp[lastCycle].timeSec,shmIpcPtr[0]->bp[lastCycle].timeNSec,sendLength); fprintf(stderr,"\nTime = %d-%d size = %d\n",ixDataBlock->header.dcuheader[0].timeSec,
ixDataBlock->header.dcuheader[0].timeNSec,sendLength);
fprintf(stderr,"Sym gps = %d-%d (time received)\n", (int)sym_gps_sec, (int)sym_gps_nsec); fprintf(stderr,"Sym gps = %d-%d (time received)\n", (int)sym_gps_sec, (int)sym_gps_nsec);
fprintf(stderr,"\tCycle = "); fprintf(stderr,"\tCycle = ");
for(ii=0;ii<nsys;ii++) fprintf(stderr,"\t\t%d",ixDataBlock->header.dcuheader[ii].cycle); for(ii=0;ii<nsys;ii++) fprintf(stderr,"\t\t%d",ixDataBlock->header.dcuheader[ii].cycle);
...@@ -209,178 +156,27 @@ void print_diags(int nsys, int lastCycle, int sendLength, daq_multi_dcu_data_t * ...@@ -209,178 +156,27 @@ void print_diags(int nsys, int lastCycle, int sendLength, daq_multi_dcu_data_t *
fprintf(stderr,"\n\tTPSize = "); fprintf(stderr,"\n\tTPSize = ");
for(ii=0;ii<nsys;ii++) fprintf(stderr,"\t\t%d",ixDataBlock->header.dcuheader[ii].tpBlockSize); for(ii=0;ii<nsys;ii++) fprintf(stderr,"\t\t%d",ixDataBlock->header.dcuheader[ii].tpBlockSize);
fprintf(stderr,"\n\tXmitSize = "); fprintf(stderr,"\n\tXmitSize = ");
for(ii=0;ii<nsys;ii++) fprintf(stderr,"\t\t%d",shmIpcPtr[ii]->dataBlockSize); for(ii=0;ii<nsys;ii++) fprintf(stderr,"\t\t%d",ixDataBlock->header.fullDataBlockSize);
fprintf(stderr,"\n\n "); fprintf(stderr,"\n\n ");
} }
// **********************************************************************************************
// Get control model loop rates from GDS param files
// Needed to properly size TP data into the data stream
int getmodelrate( int *rate, int *dcuid, char *modelname, char *gds_tp_dir) {
char gdsfile[128];
int ii = 0;
FILE *f = 0;
char *token = 0;
char *search = "=";
char line[80];
char *s = 0;
char *s1 = 0;
if (gds_tp_dir) {
sprintf(gdsfile, "%s/tpchn_%s.par", gds_tp_dir, modelname);
} else {
/// Need to get IFO and SITE info from environment variables.
s = getenv("IFO");
for (ii = 0; s[ii] != '\0'; ii++) {
if (isupper(s[ii])) s[ii] = (char) tolower(s[ii]);
}
s1 = getenv("SITE");
for (ii = 0; s1[ii] != '\0'; ii++) {
if (isupper(s1[ii])) s1[ii] = (char) tolower(s1[ii]);
}
sprintf(gdsfile, "/opt/rtcds/%s/%s/target/gds/param/tpchn_%s.par", s1, s, modelname);
}
f = fopen(gdsfile, "rt");
if (!f) return 0;
while(fgets(line,80,f) != NULL) {
line[strcspn(line, "\n")] = 0;
if (strstr(line, "datarate") != NULL) {
token = strtok(line, search);
token = strtok(NULL, search);
if (!token) continue;
while (*token && *token == ' ') {
++token;
}
*rate = atoi(token);
break;
}
}
fclose(f);
f = fopen(gdsfile, "rt");
if (!f) return 0;
while(fgets(line,80,f) != NULL) {
line[strcspn(line, "\n")] = 0;
if (strstr(line, "rmid") != NULL) {
token = strtok(line, search);
token = strtok(NULL, search);
if (!token) continue;
while (*token && *token == ' ') {
++token;
}
*dcuid = atoi(token);
break;
}
}
fclose(f);
return 0;
}
// **********************************************************************************************
int loadMessageBuffer( int nsys,
int lastCycle,
int status,
int dataRdy[]
)
{
int sendLength = 0;
int ii;
int dataXferSize;
char *dataBuff;
int myCrc = 0;
int crcLength = 0;
// Set pointer to 0MQ message data block
zbuffer = (char *)&ixDataBlock->dataBlock[0];
// Initialize data send length to size of message header
sendLength = header_size;
// Set number of FE models that have data in this message
ixDataBlock->header.fullDataBlockSize = 0;
int db = 0;
// Loop thru all FE models
for (ii=0;ii<nsys;ii++) {
if(dataRdy[ii]) {
// Set heartbeat monitor for return to DAQ software
if (lastCycle == 0) shmIpcPtr[ii]->reqAck ^= daqStatBit[0];
// Set DCU ID in header
ixDataBlock->header.dcuheader[db].dcuId = shmIpcPtr[ii]->dcuId;
// Set DAQ .ini file CRC checksum
ixDataBlock->header.dcuheader[db].fileCrc = shmIpcPtr[ii]->crc;
// Set 1/16Hz cycle number
ixDataBlock->header.dcuheader[db].cycle = shmIpcPtr[ii]->cycle;
// Set GPS seconds
ixDataBlock->header.dcuheader[db].timeSec = shmIpcPtr[ii]->bp[lastCycle].timeSec;
// Set GPS nanoseconds
ixDataBlock->header.dcuheader[db].timeNSec = shmIpcPtr[ii]->bp[lastCycle].timeNSec;
crcLength = shmIpcPtr[ii]->bp[lastCycle].crc;
// Set Status -- as running
ixDataBlock->header.dcuheader[db].status = 2;
// Indicate size of data block
// ********ixDataBlock->header.dcuheader[db].dataBlockSize = shmIpcPtr[ii]->dataBlockSize;
ixDataBlock->header.dcuheader[db].dataBlockSize = crcLength;
// Prevent going beyond MAX allowed data size
if (ixDataBlock->header.dcuheader[db].dataBlockSize > DAQ_DCU_BLOCK_SIZE)
ixDataBlock->header.dcuheader[db].dataBlockSize = DAQ_DCU_BLOCK_SIZE;
// Calculate TP data size
ixDataBlock->header.dcuheader[db].tpCount = (unsigned int)shmTpTable[ii]->count & 0xff;
ixDataBlock->header.dcuheader[db].tpBlockSize = sizeof(float) * modelrates[ii] * ixDataBlock->header.dcuheader[db].tpCount / DAQ_NUM_DATA_BLOCKS_PER_SECOND;
// Copy GDSTP table to xmission buffer header
memcpy(&(ixDataBlock->header.dcuheader[db].tpNum[0]),
&(shmTpTable[ii]->tpNum[0]),
sizeof(int)*ixDataBlock->header.dcuheader[db].tpCount);
// Set pointer to dcu data in shared memory
dataBuff = (char *)(shmDataPtr[ii] + lastCycle * buf_size);
// Copy data from shared memory into local buffer
dataXferSize = ixDataBlock->header.dcuheader[db].dataBlockSize + ixDataBlock->header.dcuheader[db].tpBlockSize;
// if the dataXferSize is too large, something is wrong so return error message.
if(dataXferSize > DAQ_DCU_BLOCK_SIZE) return(-1);
memcpy((void *)zbuffer, dataBuff, dataXferSize);
// Calculate CRC on the data and add to header info
myCrc = 0;
myCrc = crc_ptr((char *)zbuffer, crcLength, 0);
myCrc = crc_len(crcLength, myCrc);
ixDataBlock->header.dcuheader[db].dataCrc = myCrc;
// Increment the 0mq data buffer pointer for next FE
zbuffer += dataXferSize;
// Increment the 0mq message size with size of FE data block
sendLength += dataXferSize;
// Increment the data block size for the message, this includes regular data + TP data
ixDataBlock->header.fullDataBlockSize += dataXferSize;
// Update heartbeat monitor to DAQ code
if (lastCycle == 0) shmIpcPtr[ii]->reqAck ^= daqStatBit[1];
db ++;
}
}
ixDataBlock->header.dcuTotalModels = db;
return sendLength;
}
// ********************************************************************************************** // **********************************************************************************************
int send_to_local_memory(int nsys, int send_to_local_memory(int nsys,
int xmitData,
int send_delay_ms, int send_delay_ms,
mx_endpoint_t ep, mx_endpoint_t ep,
int64_t his_nic_id, int64_t his_nic_id,
uint16_t his_eid, uint16_t his_eid,
int len, uint32_t match_val)
uint32_t match_val,
uint16_t my_dcu)
{ {
int do_wait = 1; int do_wait = 1;
char *nextData; char *nextData = 0;
int ii; int ii =0;
int lastCycle = 0; int lastCycle = 0;
unsigned int nextCycle = 0; unsigned int nextCycle = 0;
int sync2iop = 1;
int status = 0; int status = 0;
int dataRdy[10]; int dataRdy[10];
...@@ -391,43 +187,44 @@ int send_to_local_memory(int nsys, ...@@ -391,43 +187,44 @@ int send_to_local_memory(int nsys,
uint32_t result; uint32_t result;
mx_endpoint_addr_t dest; mx_endpoint_addr_t dest;
uint32_t filter = FILTER; uint32_t filter = FILTER;
int init_mx = 1;
for(ii=0;ii<10;ii++) dataRdy[ii] = 0; for(ii=0;ii<10;ii++) dataRdy[ii] = 0;
mx_set_error_handler(MX_ERRORS_RETURN); mx_set_error_handler(MX_ERRORS_RETURN);
int myErrorSignal = 1; int myErrorSignal = 1;
do { do{
mx_return_t conStat = mx_connect(ep, his_nic_id, his_eid, filter,
1000, &dest);
if (conStat != MX_SUCCESS)
{
myErrorSignal = 1;
}
else
{
myErrorSignal = 0;
fprintf(stderr, "Connection Made\n");
mx_return_t ret = mx_set_request_timeout(ep, 0, 1); // Set one second timeout
if (ret != MX_SUCCESS)
{
fprintf(stderr, "Failed to set request timeout %s\n", mx_strerror(ret));
exit(1);
}
if(init_mx && xmitData) { }
do{ } while(myErrorSignal);
mx_return_t conStat = mx_connect(ep, his_nic_id, his_eid, filter,
1000, &dest); do {
if (conStat != MX_SUCCESS) myErrorSignal = 1;
else {
myErrorSignal = 0;
fprintf(stderr, "Connection Made\n");
mx_return_t ret = mx_set_request_timeout(ep, 0, 1); // Set one second timeout
if (ret != MX_SUCCESS) {
fprintf(stderr, "Failed to set request timeout %s\n", mx_strerror(ret));
exit(1);
}
}
}while(myErrorSignal);
init_mx = 0;
}
myErrorSignal = 0; myErrorSignal = 0;
for(ii=0;ii<nsys;ii++) dataRdy[ii] = 0; for(ii=0;ii<nsys;ii++) dataRdy[ii] = 0;
status = waitNextCycle2(nsys,nextCycle,sync2iop,dataRdy,shmIpcPtr);
status = wait_for_cycle(ifo_header, nextCycle);
// status = waitNextCycle(nextCycle,sync2iop,shmIpcPtr[0]); // status = waitNextCycle(nextCycle,sync2iop,shmIpcPtr[0]);
if(!status) { if(status != 0) {
keepRunning = 0;; keepRunning = 0;;
return(0); return(0);
} }
else sync2iop = 0;
// IOP will be first model ready // IOP will be first model ready
// Need to wait for 2K models to reach end of their cycled // Need to wait for 2K models to reach end of their cycled
...@@ -437,47 +234,42 @@ int send_to_local_memory(int nsys, ...@@ -437,47 +234,42 @@ int send_to_local_memory(int nsys,
nextData = (char *)ifo_data; nextData = (char *)ifo_data;
nextData += cycle_data_size * nextCycle; nextData += cycle_data_size * nextCycle;
ixDataBlock = (daq_multi_dcu_data_t *)nextData; ixDataBlock = (daq_multi_dcu_data_t *)nextData;
int sendLength = loadMessageBuffer(nsys, nextCycle, status,dataRdy); int sendLength = ixDataBlock->header.fullDataBlockSize + sizeof(daq_multi_dcu_header_t);
if(sendLength == -1 || sendLength > MSG_BUF_SIZE) { if(sendLength == -1 || sendLength > MSG_BUF_SIZE) {
fprintf(stderr, "Message buffer overflow error\n"); fprintf(stderr, "Message buffer overflow error\n");
return(-1); return(-1);
} }
// Print diags in verbose mode // Print diags in verbose mode
if(nextCycle == 8 && do_verbose) print_diags(nsys,lastCycle,sendLength,ixDataBlock); if(nextCycle == 8 && do_verbose) print_diags(nsys,lastCycle,sendLength,ixDataBlock);
// Write header info
ifo_header->curCycle = nextCycle; // Copy data to 0mq message buffer
ifo_header->cycleDataSize = cycle_data_size; memcpy((void*)&msg_buffer,nextData,sendLength);
ifo_header->maxCycle = DAQ_NUM_DATA_BLOCKS_PER_SECOND; // Send Data
usleep(send_delay_ms * 1000);
if(xmitData) { seg.segment_ptr = &msg_buffer;
// Copy data to 0mq message buffer seg.segment_length = sendLength;
memcpy((void*)&msg_buffer,nextData,sendLength); mx_return_t res = mx_isend(ep, &seg, 1, dest, match_val, NULL, &req[cur_req]);
// Send Data if (res != MX_SUCCESS) {
usleep(send_delay_ms * 1000); fprintf(stderr, "mx_isend failed ret=%d\n", res);
seg.segment_ptr = &msg_buffer; myErrorSignal = 1;
seg.segment_length = sendLength; break;
mx_return_t res = mx_isend(ep, &seg, 1, dest, match_val, NULL, &req[cur_req]); }
if (res != MX_SUCCESS) {
fprintf(stderr, "mx_isend failed ret=%d\n", res);
myErrorSignal = 1;
break;
}
again: again:
res = mx_wait(ep, &req[cur_req], 50, &stat, &result); res = mx_wait(ep, &req[cur_req], 50, &stat, &result);
if (res != MX_SUCCESS) { if (res != MX_SUCCESS) {
fprintf(stderr, "mx_cancel() failed with status %s\n", mx_strerror(res)); fprintf(stderr, "mx_cancel() failed with status %s\n", mx_strerror(res));
exit(1); exit(1);
} }
if (result == 0) { if (result == 0) {
fprintf(stderr, "trying again \n"); fprintf(stderr, "trying again \n");
goto again; goto again;
// myErrorSignal = 1; // myErrorSignal = 1;
} }
if (stat.code != MX_STATUS_SUCCESS) { if (stat.code != MX_STATUS_SUCCESS) {
fprintf(stderr, "isendxxx failed with status %s\n", mx_strstatus(stat.code)); fprintf(stderr, "isendxxx failed with status %s\n", mx_strstatus(stat.code));
myErrorSignal = 1; myErrorSignal = 1;
} }
}
nextCycle = (nextCycle + 1) % 16; nextCycle = (nextCycle + 1) % 16;
...@@ -506,13 +298,7 @@ main(int argc,char *argv[]) ...@@ -506,13 +298,7 @@ main(int argc,char *argv[])
int max_data_size_mb = 64; int max_data_size_mb = 64;
int max_data_size = 0; int max_data_size = 0;
int error = 0; int error = 0;
int status = -1; char *buffer_name = "local_dc";
unsigned long gps_frac = 0;
int gps_stt = 0;
int gps_ok = 0;
unsigned long gps_time = 0;
int sendViaOmx = 0;
char *buffer_name = "ifo";
int send_delay_ms = 0; int send_delay_ms = 0;
mx_endpoint_t ep; mx_endpoint_t ep;
...@@ -523,10 +309,6 @@ main(int argc,char *argv[]) ...@@ -523,10 +309,6 @@ main(int argc,char *argv[])
uint16_t his_eid; uint16_t his_eid;
char *rem_host; char *rem_host;
char *sysname; char *sysname;
int len;
int iter;
int do_wait;
int do_bothways;
extern char *optarg; extern char *optarg;
mx_return_t ret; mx_return_t ret;
...@@ -537,10 +319,6 @@ main(int argc,char *argv[]) ...@@ -537,10 +319,6 @@ main(int argc,char *argv[])
my_eid = DFLT_EID; my_eid = DFLT_EID;
his_eid = DFLT_EID; his_eid = DFLT_EID;
board_id = MX_ANY_NIC; board_id = MX_ANY_NIC;
len = DFLT_LEN;
iter = DFLT_ITER;
do_wait = 0;
do_bothways = 0;
...@@ -554,14 +332,15 @@ main(int argc,char *argv[]) ...@@ -554,14 +332,15 @@ main(int argc,char *argv[])
} }
/* Get the parameters */ /* Get the parameters */
while ((counter = getopt(argc, argv, "b:e:m:h:v:s:r:t:d:l:D:")) != EOF) while ((counter = getopt(argc, argv, "b:e:m:h:v:r:t:d:l:D:")) != EOF)
{
switch(counter) { switch(counter) {
case 't': case 't':
rem_host = optarg; rem_host = optarg;
sendViaOmx = 1;
break; break;
case 'b': case 'b':
buffer_name = optarg; buffer_name = optarg;
fprintf(stderr, "Buffer name = '%s'\n", buffer_name);
break; break;
case 'm': case 'm':
...@@ -576,10 +355,6 @@ main(int argc,char *argv[]) ...@@ -576,10 +355,6 @@ main(int argc,char *argv[])
} }
break; break;
case 's':
sysname = optarg;
fprintf (stderr,"sysnames = %s\n",sysname);
continue;
case 'l': case 'l':
if (0 == freopen(optarg, "w", stdout)) { if (0 == freopen(optarg, "w", stdout)) {
perror ("freopen"); perror ("freopen");
...@@ -590,6 +365,7 @@ main(int argc,char *argv[]) ...@@ -590,6 +365,7 @@ main(int argc,char *argv[])
break; break;
case 'r': case 'r':
his_eid = atoi(optarg); his_eid = atoi(optarg);
fprintf (stderr, "remoteeid = %d\n", his_eid);
break; break;
case 'v': case 'v':
do_verbose = atoi(optarg); do_verbose = atoi(optarg);
...@@ -607,84 +383,30 @@ main(int argc,char *argv[]) ...@@ -607,84 +383,30 @@ main(int argc,char *argv[])
case 'D': case 'D':
send_delay_ms = atoi(optarg); send_delay_ms = atoi(optarg);
break; break;
default:
fprintf(stderr, "Not handling argument '%c'\n", counter);
}
} }
max_data_size = max_data_size_mb * 1024*1024; max_data_size = max_data_size_mb * 1024*1024;
// If sending to DAQ via net enabled, ensure all options have been set // If sending to DAQ via net enabled, ensure all options have been set
if (sendViaOmx) { mx_init();
mx_init(); MX_MUTEX_INIT(&stream_mutex);
MX_MUTEX_INIT(&stream_mutex); if(my_eid == DFLT_EID || his_eid == DFLT_EID)
if(my_eid == DFLT_EID || his_eid == DFLT_EID){ {
fprintf(stderr, "\n***ERROR\n***Must set both -e and -r options to send data to DAQ\n\n"); fprintf(stderr, "\n***ERROR\n***Must set both -e and -r options to send data to DAQ\n\n");
Usage(); Usage();
return(0); return(0);
}
fprintf(stderr,"Writing DAQ data to local shared memory and sending out on Open-MX\n");
if(my_eid == 0) {
daqStatBit[0] = 1;
daqStatBit[1] = 2;
} else {
daqStatBit[0] = 4;
daqStatBit[1] = 8;
}
} else {
daqStatBit[0] = 1;
daqStatBit[1] = 2;
fprintf(stderr,"Writing DAQ data to local shared memory only \n");
} }
fprintf(stderr,"Writing DAQ data to local shared memory and sending out on Open-MX\n");
// Parse the model names if(my_eid == 0) {
if(sysname != NULL) { daqStatBit[0] = 1;
fprintf(stderr,"System names: %s\n", sysname); daqStatBit[1] = 2;
sprintf(modelnames[0],"%s",strtok(sysname, " "));
for(;;) {
char *s = strtok(0, " ");
if (!s) break;
sprintf(modelnames[nsys],"%s",s);
dcuId[nsys] = 0;
nsys++;
}
} else { } else {
Usage(); daqStatBit[0] = 4;
return(0); daqStatBit[1] = 8;
}
// Open file descriptor for the gpstime driver
symmetricom_fd = open ("/dev/gpstime", O_RDWR | O_SYNC);
if (symmetricom_fd < 0) {
perror("/dev/gpstime");
exit(1);
}
gps_ok = symm_ok();
gps_time = symm_gps_time(&gps_frac, &gps_stt);
fprintf(stderr,"GPS TIME = %ld\tfrac = %ld\tstt = %d\n",gps_time,gps_frac,gps_stt);
// Find the shared memory locations for the various model names
for(ii=0;ii<nsys;ii++) {
char shmem_fname[128];
sprintf(shmem_fname, "%s_daq", modelnames[ii]);
void *dcu_addr = findSharedMemory(shmem_fname);
if (dcu_addr == NULL) {
fprintf(stderr, "Can't map shmem\n");
exit(-1);
} else {
fprintf(stderr," %s mapped at 0x%lx\n",modelnames[ii],(unsigned long)dcu_addr);
}
shmIpcPtr[ii] = (struct rmIpcStr *)((char *)dcu_addr + CDS_DAQ_NET_IPC_OFFSET);
shmDataPtr[ii] = ((char *)dcu_addr + CDS_DAQ_NET_DATA_OFFSET);
shmTpTable[ii] = (struct cdsDaqNetGdsTpNum *)((char *)dcu_addr + CDS_DAQ_NET_GDS_TP_TABLE_OFFSET);
}
// Get model rates to get GDS TP data sizes.
for (ii = 0; ii < nsys; ii++) {
status = getmodelrate(&modelrates[ii],&dcuid[ii],modelnames[ii], gds_tp_dir);
fprintf(stderr,"Model %s rate = %d dcuid = %d\n",modelnames[ii],modelrates[ii],dcuid[ii]);
if (status != 0) {
fprintf(stderr, "Unable to determine the rate of %s\n", modelnames[ii]);
exit(1);
}
} }
// Get pointers to local DAQ mbuf // Get pointers to local DAQ mbuf
...@@ -694,33 +416,36 @@ main(int argc,char *argv[]) ...@@ -694,33 +416,36 @@ main(int argc,char *argv[])
cycle_data_size = (max_data_size - sizeof(daq_multi_cycle_header_t))/DAQ_NUM_DATA_BLOCKS_PER_SECOND; cycle_data_size = (max_data_size - sizeof(daq_multi_cycle_header_t))/DAQ_NUM_DATA_BLOCKS_PER_SECOND;
cycle_data_size -= (cycle_data_size % 8); cycle_data_size -= (cycle_data_size % 8);
fprintf(stderr, "ifo mapped to %p\n", ifo);
// Setup signal handler to catch Control C // Setup signal handler to catch Control C
signal(SIGINT,intHandler); signal(SIGINT,intHandler);
sleep(1); sleep(1);
if(sendViaOmx) {
// Open the NIC endpoint to send data // Open the NIC endpoint to send data
fprintf(stderr,"Open endpoint \n"); fprintf(stderr,"Open endpoint \n");
ret = mx_open_endpoint(board_id, my_eid, filter, NULL, 0, &ep); ret = mx_open_endpoint(board_id, my_eid, filter, NULL, 0, &ep);
if (ret != MX_SUCCESS) { if (ret != MX_SUCCESS) {
fprintf(stderr, "Failed to open endpoint %s\n", mx_strerror(ret)); fprintf(stderr, "Failed to open endpoint %s\n", mx_strerror(ret));
exit(1); exit(1);
}
sleep(1);
mx_hostname_to_nic_id(rem_host, &his_nic_id);
} }
sleep(1);
mx_hostname_to_nic_id(rem_host, &his_nic_id);
reset_cycle_counter(ifo_header);
// Enter infinite loop of reading control model data and writing to local shared memory // Enter infinite loop of reading control model data and writing to local shared memory
do { do {
error = send_to_local_memory(nsys, sendViaOmx, send_delay_ms,ep,his_nic_id,his_eid,len,MATCH_VAL_MAIN,my_eid); error = send_to_local_memory(nsys, send_delay_ms,ep,his_nic_id,his_eid,MATCH_VAL_MAIN);
} while (error == 0 && keepRunning == 1); } while (error == 0 && keepRunning == 1);
// Cleanup Open-MX stuff // Cleanup Open-MX stuff
if(sendViaOmx) {
fprintf(stderr,"Closing out OpenMX and exiting\n"); fprintf(stderr,"Closing out OpenMX and exiting\n");
mx_close_endpoint(ep); mx_close_endpoint(ep);
mx_finalize(); mx_finalize();
}
return 0; return 0;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment