Skip to content
Snippets Groups Projects
Commit 613308e6 authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Updates to answer review notes.

Fix usage message in local_dc.

Remove commented out code in the makefile and associated .c code

Looking for variables with ix_ prefix and removed an usused bit.
parent 2a559897
No related branches found
No related tags found
1 merge request!13Implementing dix_xmit and updating dix_recv.
......@@ -43,31 +43,6 @@ rfm.o: ../drv/rfm.c
crc.o: ../drv/crc.c
$(CC) $(CFLAGS) -c $< -o $@
# ix_rcvr: ix_rcvr.c
# $(CC) $(CFLAGS) ix_rcvr.c -c -o ix_rcvr.o
# $(CC) $(CFLAGS) ../drv/rfm.c -c -o rfm.o
# $(CC) $(FLAGS) -o ix_rcvr ix_rcvr.o rfm.o -L $(API_LIB_PATH) -lsisci
# sync
#
# ix_rcvr_threads: ix_rcvr_threads.c
# $(CC) $(CFLAGS) ix_rcvr_threads.c -c -o ix_rcvr_threads.o
# $(CC) $(CFLAGS) ../drv/rfm.c -c -o rfm.o
# $(CC) $(FLAGS) -o ix_rcvr_threads ix_rcvr_threads.o rfm.o -L $(API_LIB_PATH) -lsisci
# sync
#
# ix_dc_xmit: ix_dc_xmit.c
# $(CC) $(CFLAGS) ix_dc_xmit.c -c -o ix_dc_xmit.o
# $(CC) $(CFLAGS) ../drv/rfm.c -c -o rfm.o
# $(CC) $(FLAGS) -o ix_dc_xmit ix_dc_xmit.o rfm.o -L $(API_LIB_PATH) -lsisci
# sync
# mx2ix: mx2ix.c
# $(CC) $(CFLAGS) -I../zmq_stream -I/opt/open-mx/include mx2ix.c -c -o mx2ix.o
# $(CC) $(CFLAGS) ../drv/rfm.c -c -o rfm.o
# $(CC) $(CFLAGS) -I../zmq_stream ../zmq_stream/simple_pv.c -c -o simple_pv.o
# $(CC) $(FLAGS) -o mx2ix mx2ix.o rfm.o simple_pv.o -L $(API_LIB_PATH) -L/opt/open-mx/lib -lsisci -lmyriexpress -lpthread
# sync
dix_xmit: dix_xmit.c
$(CC) $(CFLAGS) -I../zmq_stream -I/opt/open-mx/include dix_xmit.c -c -o dix_xmit.o
$(CC) $(CFLAGS) ../drv/rfm.c -c -o rfm.o
......@@ -75,25 +50,12 @@ dix_xmit: dix_xmit.c
$(CC) $(FLAGS) -o dix_xmit dix_xmit.o rfm.o simple_pv.o -L $(API_LIB_PATH) -L/opt/open-mx/lib -lmyriexpress -lpthread
sync
# zmq_rcv_ix_xmit: zmq_rcv_ix_xmit.c
# $(CC) $(CFLAGS) -I../zmq_stream zmq_rcv_ix_xmit.c -c -o zmq_rcv_ix_xmit.o
# $(CC) $(CFLAGS) ../drv/rfm.c -c -o rfm.o
# $(CC) $(CFLAGS) -I../zmq_stream ../zmq_stream/simple_pv.c -c -o simple_pv.o
# $(CC) $(CFLAGS) ../zmq_stream/zmq_transport.c -c -o zmq_transport.o
# $(CC) $(CFLAGS) ../zmq_stream/dc_utils.c -c -o dc_utils.o
# $(CC) $(FLAGS) -o zmq_rcv_ix_xmit zmq_rcv_ix_xmit.o rfm.o zmq_transport.o dc_utils.o simple_pv.o -L $(API_LIB_PATH) -lsisci -lzmq -lpthread
# sync
dix_recv: dix_recv.c
$(CC) $(CFLAGS) dix_recv.c -c -o dix_recv.o
$(CC) $(CFLAGS) ../drv/rfm.c -c -o rfm.o
$(CC) $(FLAGS) -o dix_recv dix_recv.o rfm.o -L $(API_LIB_PATH) -lsisci
sync
# datarate: datarate.c
# $(CC) $(CFLAGS) datarate.c -c -o datarate.o
# $(CC) $(FLAGS) -o datarate datarate.o rfm.o param.o crc.o
CPPFLAGS = -I../common -I$(top_srcdir)/common -I$(top_srcdir)/common/bsd -DNO_RTL -D_XOPEN_SOURCE -D_BSD_SOURCE
LDFLAGS = -L/opt/open-mx/lib -lmyriexpress -lpthread -g
......
......@@ -36,14 +36,14 @@ Usage( )
{
printf( "Usage of dix_recv:\n" );
printf( "dix_recv -g <group>\n" );
printf( "-b <name> : Destination buffer name (default local_dc)\n" );
printf( "-m <value> : Size in MB of the destination buffer [20-100] "
printf( " -b <name> : Destination buffer name (default local_dc)\n" );
printf( " -m <value> : Size in MB of the destination buffer [20-100] "
"(default=100)\n" );
printf( " -a <value> : Local adapter number (default %d)\n",
localAdapterNo );
printf( " -g <value> : Reflective group identifier (0..5))\n" );
printf( " -v <value> : Diagnostics level (0..1) \n" );
printf( " -h : This helpscreen\n" );
printf( " -h : This help screen\n" );
printf( "\n" );
}
......
......@@ -50,7 +50,7 @@ void
usage( )
{
fprintf( stderr,
"Usage: dix_xmit [args] -m shared memory size -g IX "
"Usage: dix_ix_xmit [args] -m shared memory size -g IX "
"channel \n" );
fprintf( stderr, "-l filename - log file name\n" );
fprintf( stderr, "-b buffer name - Input buffer [local_dc]\n" );
......@@ -357,7 +357,7 @@ main( int argc, char** argv )
int64_t cur_ref_time = 0;
int festatus = 0;
int pv_festatus = 0;
int ix_xmit_stop = 0;
SimplePV pvs[] = {
{
"RECV_MIN_MS",
......@@ -569,7 +569,7 @@ main( int argc, char** argv )
missed_flag <<= 1;
}
if ( xmitData && !ix_xmit_stop )
if ( xmitData )
{
if ( sendLength > IX_BLOCK_SIZE )
{
......@@ -621,13 +621,6 @@ main( int argc, char** argv )
// network
SCIFlush( sequence, SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY );
}
if ( ix_xmit_stop )
{
ix_xmit_stop--;
if ( ix_xmit_stop == 0 )
fprintf( stderr, "Restarting Dolphin Xmit\n" );
}
}
sprintf( dcstatus, "%ld ", ets );
for ( ii = 0; ii < mytotaldcu; ii++ )
......
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "sisci_types.h"
#include "sisci_api.h"
#include "sisci_error.h"
#include "sisci_demolib.h"
#include "testlib.h"
#include <malloc.h>
#include <signal.h>
#include <unistd.h>
#include "../drv/crc.c"
#include "../include/daq_core.h"
#define __CDECL
#define DO_HANDSHAKE 0
// #define MY_DCU_OFFSET 0x1a00000
#define MY_DCU_OFFSET 0x00000
#define MY_IPC_OFFSET (MY_DCU_OFFSET + 0x8000)
#define MY_GDS_OFFSET (MY_DCU_OFFSET + 0x9000)
#define MY_DAT_OFFSET (MY_DCU_OFFSET + 0xa000)
#include "./dolphin_common.c"
extern void *findSharedMemorySize(char *,int);
static volatile int keepRunning = 1;
/*********************************************************************************/
/* U S A G E */
/* */
/*********************************************************************************/
void Usage()
{
printf("Usage of ix_dc_xmit:\n");
printf("ix_multi_stream -n <nodes> -g <group> -m <models> \n");
printf(" -a <value> : Local adapter number (default %d)\n", localAdapterNo);
printf(" -g <value> : Reflective group identifier (0..3))\n");
printf(" -v <value> : Diagnostics level (0..1))\n");
printf(" -h : This helpscreen\n");
printf("\n");
}
void intHandler(int dummy) {
keepRunning = 0;
}
/*********************************************************************************/
/* M A I N */
/* */
/*********************************************************************************/
int __CDECL
main(int argc,char *argv[])
{
int counter;
int ii;
char *mywriteaddr;
int myCrc;
static const int xmitDataOffset = MY_DAT_OFFSET + sizeof(struct daq_multi_cycle_header_t);
static const int header_size = sizeof(daq_multi_dcu_header_t);
char *sysname;
daq_multi_dcu_data_t *ixDataBlock;
int do_verbose = 0;
int sendLength = 0;
daq_multi_cycle_header_t *xmitHeader;
// daq_multi_dcu_data_t *xmitData;
int lastCycle = 0;
int new_cycle;
char *nextData;
FILE *mydiags;
printf("\n %s compiled %s : %s\n\n",argv[0],__DATE__,__TIME__);
if (argc<2) {
printf("Exiting here \n");
Usage();
return(-1);
}
/* Get the parameters */
while ((counter = getopt(argc, argv, "g:a:v:")) != EOF)
switch(counter) {
case 'g':
segmentId = atoi(optarg);
break;
case 'a':
localAdapterNo = atoi(optarg);
continue;
case 'v':
do_verbose = atoi(optarg);
continue;
case 'h':
Usage();
return(0);
}
// Attach to local shared memory
char *ifo = (char *)findSharedMemorySize("ifo",100);
// Set pointer to ifo data header
daq_multi_cycle_header_t *ifo_header = (daq_multi_cycle_header_t *)ifo;
// Set pointer to ifo data block
char *ifo_data = (char *)ifo + sizeof(daq_multi_cycle_header_t);
// Connect to Dolphin
error = dolphin_init();
printf("Read = 0x%lx \n Write = 0x%lx \n",(long)readAddr,(long)writeAddr);
// Set pointer to xmit header in Dolphin xmit data area.
mywriteaddr = (char *)writeAddr;
mywriteaddr += MY_DAT_OFFSET;
xmitHeader = (daq_multi_cycle_header_t *)mywriteaddr;
// Trap control C exit
signal(SIGINT,intHandler);
// Sync to cycle zero in ifo shared memory (FE receive data area)
for (;ifo_header->curCycle;) usleep(1000);
printf("Found cycle zero = %d\n",ifo_header->curCycle);
printf("Found cycle size = %d\n",ifo_header->cycleDataSize);
int cyclesize = ifo_header->cycleDataSize;
do{
// Wait for cycle count update in ifo shared memory
// Check every 2 milliseconds
do{
usleep(2000);
new_cycle = ifo_header->curCycle;
} while (new_cycle == lastCycle && keepRunning);
// Save cycle number of last received
lastCycle = new_cycle;
// Move pointer to proper data cycle in ifo memory
nextData = (char *)ifo_data;
nextData += cyclesize * new_cycle;
ixDataBlock = (daq_multi_dcu_data_t *)nextData;
// Set the xmit data length to be header size plus data size
sendLength = header_size + ixDataBlock->header.dataBlockSize;
// Print some diagnostics to file
if(new_cycle == 0 && do_verbose > 0)
{
mydiags = fopen("./ix_diags.txt","w");
if(mydiags) {
fprintf(mydiags,"%d %d %d\n", ixDataBlock->header.dcuTotalModels,
(sendLength * 16),
ixDataBlock->header.dcuheader[0].timeSec);
}
fclose(mydiags);
}
// Print some diagnostics to terminal
if(new_cycle == 0 && do_verbose == 2)
{
printf("\t\tNum DCU = %d\n", ixDataBlock->header.dcuTotalModels);
printf("\t\tNew Size = %d\n", ixDataBlock->header.dataBlockSize);
printf("\t\tTime = %d\n", ixDataBlock->header.dcuheader[0].timeSec);
printf("\t\tSend Size = %d\n", sendLength);
printf("\t\tCycle Size = %d\n", DAQ_TRANSIT_DC_DATA_BLOCK_SIZE);
}
// WRITEDATA to Dolphin Network
SCIMemCpy(sequence,ixDataBlock, remoteMap,xmitDataOffset,sendLength,memcpyFlag,&error);
if (error != SCI_ERR_OK) {
fprintf(stderr,"SCIMemCpy failed - Error code 0x%x\n",error);
return error;
}
// Calculate data CRC checksum
myCrc = crc_ptr((char *)ixDataBlock, sendLength, 0);
myCrc = crc_len(sendLength, myCrc);
// Set data header information
xmitHeader->maxCycle = ifo_header->maxCycle;
xmitHeader->cycleDataSize = sendLength;
xmitHeader->msgcrc = myCrc;
// Send cycle last as indication of data ready for receivers
xmitHeader->curCycle = ifo_header->curCycle;
// Have to flush the buffers to make data go onto Dolphin network
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
} while(keepRunning);
// Cleanup the Dolphin connections
error = dolphin_closeout();
// Exit
return SCI_ERR_OK;
}
/*********************************************************************************
* *
* Copyright (C) 1993 - 2015 *
* Dolphin Interconnect Solutions AS *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, *
* or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software *
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
* *
* *
*********************************************************************************/
/*********************************************************************************/
/* */
/* This program demonstrates the use of the SISCI Reflective Memory */
/* functionality available with the Dolphin Express DX technology. */
/* */
/* This functionality is not available for the SCI technology. */
/* */
/*********************************************************************************/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "sisci_types.h"
#include "sisci_api.h"
#include "sisci_error.h"
#include "sisci_demolib.h"
#include "testlib.h"
#include <malloc.h>
#include <signal.h>
#include <unistd.h>
#include "../drv/crc.c"
#include "../include/daqmap.h"
#include "../include/drv/fb.h"
#include "../include/daq_core.h"
#define __CDECL
#define DAQ_RDY_MAX_WAIT 80
#define DO_HANDSHAKE 0
// #define MY_DCU_OFFSET 0x1a00000
#define MY_DCU_OFFSET 0x00000
#define MY_IPC_OFFSET (MY_DCU_OFFSET + 0x1000)
#define MY_GDS_OFFSET (MY_DCU_OFFSET + 0x9000)
#define MY_DAT_OFFSET (MY_DCU_OFFSET + 0xa000)
#include "./dolphin_common.c"
extern void *findSharedMemory(char *);
static struct rmIpcStr *shmIpcPtr[128];
static char *shmDataPtr[128];
static struct cdsDaqNetGdsTpNum *shmTpTable[128];
static const int header_size = sizeof(struct daq_fe_header_t);
int *drIntData;
static const int buf_size = DAQ_DCU_BLOCK_SIZE;
char modelnames[DAQ_TRANSIT_MAX_DCU][64];
char *sysname;
int modelrates[DAQ_TRANSIT_MAX_DCU];
daq_fe_data_t ixDataBlock;
char *daqbuffer = (char *) &ixDataBlock;
char buffer[1024000];
char *zbuffer;
int do_verbose = 1;
int sendLength = 0;
static volatile int keepRunning = 1;
unsigned int loops = 170;
/*********************************************************************************/
/* U S A G E */
/* */
/*********************************************************************************/
void Usage()
{
printf("Usage of ix_multi_stream:\n");
printf("ix_multi_stream -n <nodes> -g <group> -m <models> \n");
printf(" -a <value> : Local adapter number (default %d)\n", localAdapterNo);
printf(" -s <value> : Segment size (default %d)\n", segmentSize);
printf(" -g <value> : Reflective group identifier (0..5))\n");
printf(" -n <value> : Number of receivers\n");
printf(" -l <value> : Loops to execute (default %d)\n", loops);
printf(" -h : This helpscreen\n");
printf("\n");
}
int sync2zero(struct rmIpcStr *ipcPtr) {
int lastCycle = 0;
// Find cycle zero
for (;ipcPtr->cycle;) usleep(1000);
return(lastCycle);
}
void intHandler(int dummy) {
keepRunning = 0;
}
int getmodelrate( char *modelname, char *gds_tp_dir) {
int rate = 0;
char gdsfile[128];
int ii = 0;
FILE *f = 0;
char *token = 0;
char *search = "=";
char line[80];
char *s = 0;
char *s1 = 0;
if (gds_tp_dir) {
sprintf(gdsfile, "%s/tpchn_%s.par", gds_tp_dir, modelname);
} else {
/// Need to get IFO and SITE info from environment variables.
s = getenv("IFO");
for (ii = 0; s[ii] != '\0'; ii++) {
if (isupper(s[ii])) s[ii] = (char) tolower(s[ii]);
}
s1 = getenv("SITE");
for (ii = 0; s1[ii] != '\0'; ii++) {
if (isupper(s1[ii])) s1[ii] = (char) tolower(s1[ii]);
}
sprintf(gdsfile, "/opt/rtcds/%s/%s/target/gds/param/tpchn_%s.par", s1, s, modelname);
}
f = fopen(gdsfile, "rt");
if (!f) return 0;
while(fgets(line,80,f) != NULL) {
line[strcspn(line, "\n")] = 0;
if (strstr(line, "datarate") != NULL) {
token = strtok(line, search);
token = strtok(NULL, search);
if (!token) continue;
while (*token && *token == ' ') {
++token;
}
rate = atoi(token);
break;
}
}
fclose(f);
return rate;
}
int waitServers(int nodes,sci_sequence_t sequence,volatile unsigned int *readAddr,volatile unsigned int *writeAddr)
{
int node_offset;
int value;
/* Lets wait for the servers to write CMD_READY */
printf("Wait for %d receivers ...\n", nodes);
for (node_offset=1; node_offset <= nodes;node_offset++){
int wait_loops = 0;
do {
value = (*(readAddr+IX_SYNC_OFFSET+node_offset));
wait_loops++;
} while (value != CMD_READY && keepRunning);
}
printf("Client received CMD_READY from all nodes \n\n",value);
/* Lets write CMD_READY to offset 0 to signal all servers to go on. */
*(writeAddr+IX_SYNC_OFFSET) = CMD_READY;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
if(!keepRunning) return 1;
else return 0;
}
sci_error_t send_via_reflective_memory(int nsys)
{
unsigned int value;
unsigned int written_value = 0;
int verbose = 1;
int node_offset;
volatile unsigned int *myreadAddr;
volatile unsigned int *mywriteAddr;
int timeout;
int myCrc;
int do_wait = 1;
int daqStatBit[2];
daqStatBit[0] = 1;
daqStatBit[1] = 2;
int reftimeerror = 0;
int refcycle;
int reftimeSec;
int reftimeNSec;
int dataTPLength;
int status;
myreadAddr = (unsigned int *)readAddr;
mywriteAddr = (unsigned int *)writeAddr;
status = waitServers(nodes,sequence,readAddr,writeAddr);
if (status) return SCI_ERR_OK;
mywriteAddr += 256;
myreadAddr += 256;
written_value=1;
int ii;
int new_cycle = 0;
int lastCycle = 0;
unsigned char *dataBuff;
int sync2iop = 1;
// for (;shmIpcPtr[0]->cycle;) usleep(1000);
// printf("Found cycle zero in client\n");
do {
if(sync2iop) {
printf("Syncing to IOP\n");
lastCycle = sync2zero(shmIpcPtr[0]);
sync2iop = 0;
printf("Found cycle zero\n");
}
timeout = 0;
// Wait for a new 1/16Hz DAQ data cycle
do{
usleep(1000);
new_cycle = shmIpcPtr[0]->cycle;
timeout += 1;
}while (new_cycle == lastCycle && timeout < DAQ_RDY_MAX_WAIT);
if(timeout >= DAQ_RDY_MAX_WAIT) {
sync2iop = 1;
lastCycle = sync2zero(shmIpcPtr[0]);
printf("Iop model not running\n");
}
if(sync2iop) continue;
// IOP will be first model ready
// Need to wait for 2K models to reach end of their cycled
usleep((do_wait * 1000));
// Print diags in verbose mode
if(new_cycle == 0 && !do_verbose) {
printf("\nTime = %d-%d size = %d\n",shmIpcPtr[0]->bp[lastCycle].timeSec,shmIpcPtr[0]->bp[lastCycle].timeNSec,sendLength);
printf("\tCycle = ");
for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock.header.dcuheader[ii].cycle);
printf("\n\tTimeSec = ");
for(ii=0;ii<nsys;ii++) printf("\t%d",ixDataBlock.header.dcuheader[ii].timeSec);
printf("\n\tTimeNSec = ");
for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock.header.dcuheader[ii].timeNSec);
printf("\n\tDataSize = ");
for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock.header.dcuheader[ii].dataBlockSize);
printf("\n\tTPCount = ");
for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock.header.dcuheader[ii].tpCount);
printf("\n\tTPSize = ");
for(ii=0;ii<nsys;ii++) printf("\t\t%d",ixDataBlock.header.dcuheader[ii].tpBlockSize);
printf("\n\n ");
}
// Increment the local DAQ cycle counter
lastCycle ++;
lastCycle %= 16;
// Set pointer to 0MQ message data block
zbuffer = (char *)&ixDataBlock.dataBlock[0];
// Initialize data send length to size of message header
sendLength = header_size;
// Set number of FE models that have data in this message
ixDataBlock.header.dcuTotalModels = nsys;
ixDataBlock.header.dataBlockSize = 0;
// Loop thru all FE models
for (ii=0;ii<nsys;ii++) {
reftimeerror = 0;
// Set heartbeat monitor for return to DAQ software
if (lastCycle == 0) shmIpcPtr[ii]->reqAck ^= daqStatBit[0];
// Set DCU ID in header
ixDataBlock.header.dcuheader[ii].dcuId = shmIpcPtr[ii]->dcuId;
// Set DAQ .ini file CRC checksum
ixDataBlock.header.dcuheader[ii].fileCrc = shmIpcPtr[ii]->crc;
// Set 1/16Hz cycle number
ixDataBlock.header.dcuheader[ii].cycle = shmIpcPtr[ii]->cycle;
if(ii == 0) refcycle = shmIpcPtr[ii]->cycle;
// Set GPS seconds
ixDataBlock.header.dcuheader[ii].timeSec = shmIpcPtr[ii]->bp[lastCycle].timeSec;
if (ii == 0) reftimeSec = shmIpcPtr[ii]->bp[lastCycle].timeSec;
// Set GPS nanoseconds
ixDataBlock.header.dcuheader[ii].timeNSec = shmIpcPtr[ii]->bp[lastCycle].timeNSec;
if (ii == 0) reftimeNSec = shmIpcPtr[ii]->bp[lastCycle].timeNSec;
if (ii != 0 && reftimeSec != shmIpcPtr[ii]->bp[lastCycle].timeSec)
reftimeerror = 1;;
if (ii != 0 && reftimeNSec != shmIpcPtr[ii]->bp[lastCycle].timeNSec)
reftimeerror |= 2;;
if(reftimeerror) {
ixDataBlock.header.dcuheader[ii].cycle = refcycle;
// printf("Timing error model %d\n",ii);
// Set Status -- Need to update for models not running
ixDataBlock.header.dcuheader[ii].status = 0xbad;
// Indicate size of data block
ixDataBlock.header.dcuheader[ii].dataBlockSize = 0;
ixDataBlock.header.dcuheader[ii].tpBlockSize = 0;
ixDataBlock.header.dcuheader[ii].tpCount = 0;
} else {
// Set Status -- Need to update for models not running
ixDataBlock.header.dcuheader[ii].status = 2;
// Indicate size of data block
ixDataBlock.header.dcuheader[ii].dataBlockSize = shmIpcPtr[ii]->dataBlockSize;
// Prevent going beyond MAX allowed data size
if (ixDataBlock.header.dcuheader[ii].dataBlockSize > DAQ_DCU_BLOCK_SIZE)
ixDataBlock.header.dcuheader[ii].dataBlockSize = DAQ_DCU_BLOCK_SIZE;
ixDataBlock.header.dcuheader[ii].tpCount = (unsigned int)shmTpTable[ii]->count;
ixDataBlock.header.dcuheader[ii].tpBlockSize = sizeof(float) * modelrates[ii] * ixDataBlock.header.dcuheader[ii].tpCount;
// Prevent going beyond MAX allowed data size
if (ixDataBlock.header.dcuheader[ii].tpBlockSize > DAQ_DCU_BLOCK_SIZE)
ixDataBlock.header.dcuheader[ii].tpBlockSize = DAQ_DCU_BLOCK_SIZE;
memcpy(&(ixDataBlock.header.dcuheader[ii].tpNum[0]),
&(shmTpTable[ii]->tpNum[0]),
sizeof(int)*ixDataBlock.header.dcuheader[ii].tpCount);
// Set pointer to dcu data in shared memory
dataBuff = (char *)(shmDataPtr[ii] + lastCycle * buf_size);
// Copy data from shared memory into local buffer
dataTPLength = ixDataBlock.header.dcuheader[ii].dataBlockSize + ixDataBlock.header.dcuheader[ii].tpBlockSize;
memcpy((void *)zbuffer, dataBuff, dataTPLength);
// Calculate CRC on the data and add to header info
myCrc = crc_ptr((char *)zbuffer, ixDataBlock.header.dcuheader[ii].dataBlockSize, 0); // .crc is crcLength
myCrc = crc_len(ixDataBlock.header.dcuheader[ii].dataBlockSize, myCrc);
ixDataBlock.header.dcuheader[ii].dataCrc = myCrc;
// Increment the 0mq data buffer pointer for next FE
zbuffer += dataTPLength;
// Increment the 0mq message size with size of FE data block
sendLength += dataTPLength;
// Increment the data block size for the message
ixDataBlock.header.dataBlockSize += (unsigned int)dataTPLength;
// Update heartbeat monitor to DAQ code
if (lastCycle == 0) shmIpcPtr[ii]->reqAck ^= daqStatBit[1];
}
}
// Copy data to combined message buffer
memcpy(buffer,daqbuffer,sendLength);
// Send Data
if (!verbose) {
printf("Send broadcast message (value %d) to %d available nodes ...\n",written_value,nodes);
}
/* Lets write the value to offset 0 */
// WRITEDATA
SCIMemCpy(sequence,buffer, remoteMap,MY_DAT_OFFSET,sendLength,memcpyFlag,&error);
if (error != SCI_ERR_OK) {
fprintf(stderr,"SCIMemCpy failed - Error code 0x%x\n",error);
return error;
}
*mywriteAddr = written_value;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
//printf("Writing cycle %d with size %d \n",lastCycle,sendLength);
/* Lets wait for the servers to write the written value +1 */
for (node_offset=1; node_offset <= nodes;node_offset++){
int wait_loops = 0;
do {
value = (*(myreadAddr+node_offset));
wait_loops++;
if ((wait_loops % 10000000)==0){
printf("Value = %d (expected = %d) delayed from rank %d - after %u reads\n", value, written_value+1, node_offset, wait_loops);
return SCI_ERR_OK;
}
} while (value != written_value+1);
}
if (!verbose) {
printf("Received broadcast ack %d from all nodes \n\n",value);
}
written_value++;
} while (keepRunning); /* do this number of loops */
printf("\n***********************************************************\n\n");
return SCI_ERR_OK;
}
/*********************************************************************************/
/* M A I N */
/* */
/*********************************************************************************/
int __CDECL
main(int argc,char *argv[])
{
int counter;
int nsys = 1;
int dcuId[10];
int ii;
char *gds_tp_dir = 0;
printf("\n %s compiled %s : %s\n\n",argv[0],__DATE__,__TIME__);
if (argc<3) {
Usage();
return(-1);
}
/* Get the parameters */
while ((counter = getopt(argc, argv, "r:n:g:l:s:m:h:a:")) != EOF)
switch(counter) {
case 'r':
rank = atoi(optarg);
break;
case 'n':
nodes = atoi(optarg);
break;
case 'g':
segmentId = atoi(optarg);
break;
case 'l':
loops = atoi(optarg);
break;
case 's':
segmentSize = atoi(optarg);
if (segmentSize < 4096){
printf("Min segment size is 4 KB\n");
return -1;
}
break;
case 'm':
sysname = optarg;
printf ("sysnames = %s\n",sysname);
continue;
case 'a':
localAdapterNo = atoi(optarg);
continue;
case 'h':
Usage();
return(0);
}
if(sysname != NULL) {
printf("System names: %s\n", sysname);
sprintf(modelnames[0],"%s",strtok(sysname, " "));
for(;;) {
char *s = strtok(0, " ");
if (!s) break;
sprintf(modelnames[nsys],"%s",s);
dcuId[nsys] = 0;
nsys++;
}
} else {
Usage();
return(0);
}
for(ii=0;ii<nsys;ii++) {
char shmem_fname[128];
sprintf(shmem_fname, "%s_daq", modelnames[ii]);
void *dcu_addr = findSharedMemory(shmem_fname);
if (dcu_addr <= 0) {
fprintf(stderr, "Can't map shmem\n");
exit(-1);
} else {
printf(" %s mapped at 0x%lx\n",modelnames[ii],(unsigned long)dcu_addr);
}
shmIpcPtr[ii] = (struct rmIpcStr *)((char *)dcu_addr + CDS_DAQ_NET_IPC_OFFSET);
shmDataPtr[ii] = ((char *)dcu_addr + CDS_DAQ_NET_DATA_OFFSET);
shmTpTable[ii] = (struct cdsDaqNetGdsTpNum *)((char *)dcu_addr + CDS_DAQ_NET_GDS_TP_TABLE_OFFSET);
}
for (ii = 0; ii < nsys; ii++) {
modelrates[ii] = getmodelrate(modelnames[ii], gds_tp_dir);
if (modelrates[ii] == 0) {
fprintf(stderr, "Unable to determine the rate of %s\n", modelnames[ii]);
exit(1);
}
}
for (;shmIpcPtr[0]->cycle;) usleep(1000);
int lastCycle = 0;
printf("Found cycle zero \n");
int new_cycle;
// Wait for cycle count update from FE
do{
usleep(1000);
new_cycle = shmIpcPtr[0]->cycle;
} while (new_cycle == lastCycle);
printf("New cycle = %d\n", shmIpcPtr[0]->cycle);
printf("Size of rmIpcStr = %ld\n", sizeof(struct rmIpcStr));
printf("Size of GDStp = %ld\n", sizeof(struct cdsDaqNetGdsTpNum));
drIntData = (int *)shmDataPtr[0];
drIntData += 2;
printf("CPU = %d\n",*drIntData);
signal(SIGINT,intHandler);
error = dolphin_init();
printf("Read = 0x%lx \n Write = 0x%lx \n",(long)readAddr,(long)writeAddr);
do {
error = send_via_reflective_memory(nsys);
} while (error == SCI_ERR_OK && keepRunning == 1);
error = dolphin_closeout();
return SCI_ERR_OK;
}
/*********************************************************************************
* *
* Copyright (C) 1993 - 2015 *
* Dolphin Interconnect Solutions AS *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, *
* or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software *
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
* *
* *
*********************************************************************************/
/*********************************************************************************/
/* */
/* This program demonstrates the use of the SISCI Reflective Memory */
/* functionality available with the Dolphin Express DX technology. */
/* */
/* This functionality is not available for the SCI technology. */
/* */
/*********************************************************************************/
#ifdef _WIN32
#ifndef OS_IS_WINDOWS
#define OS_IS_WINDOWS 1
#endif /* !OS_IS_WINDOWS */
#endif /* _WIN32 */
#ifdef OS_IS_WINDOWS
#include <windows.h>
#else
#include <unistd.h>
#endif
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "sisci_types.h"
#include "sisci_api.h"
#include "sisci_error.h"
#include "sisci_demolib.h"
#include "testlib.h"
#include <malloc.h>
#include "../drv/crc.c"
#include "../include/daqmap.h"
#include "../include/drv/fb.h"
#include "../include/daq_core.h"
#define __CDECL
#define NO_CALLBACK NULL
#define NO_FLAGS 0
#define DATA_TRANSFER_READY 8
#define CMD_READY 1234
/* Use upper 4 KB of segment for synchronization. */
#define SYNC_OFFSET ((segmentSize) / 4 - 1024)
#define FILTER 0x12345
#define MATCH_VAL 0xabcdef
#define DFLT_EID 1
#define DFLT_LEN 8192
#define DFLT_END 128
#define MAX_LEN (1024*1024*1024)
#define DFLT_ITER 1000
#define NUM_RREQ 16 /* currently constrained by MX_MCP_RDMA_HANDLES_CNT*/
#define NUM_SREQ 256 /* currently constrained by MX_MCP_RDMA_HANDLES_CNT*/
#define DO_HANDSHAKE 0
#define MATCH_VAL_MAIN (1 << 31)
#define MATCH_VAL_THREAD 1
// #define MY_DCU_OFFSET 0x1a00000
#define MY_DCU_OFFSET 0x00000
#define MY_IPC_OFFSET (MY_DCU_OFFSET + 0x8000)
#define MY_GDS_OFFSET (MY_DCU_OFFSET + 0x9000)
#define MY_DAT_OFFSET (MY_DCU_OFFSET + 0xa000)
#include "./dolphin_common.c"
extern void *findSharedMemory(char *);
static struct rmIpcStr *shmIpcPtr[128];
static char *shmDataPtr[128];
static struct cdsDaqNetGdsTpNum *shmTpTable[128];
static const int header_size = sizeof(struct daq_fe_header_t);
static const int buf_size = DAQ_DCU_BLOCK_SIZE;
char modelnames[DAQ_TRANSIT_MAX_DCU][64];
char *sysname;
int modelrates[DAQ_TRANSIT_MAX_DCU];
daq_multi_dcu_data_t ixDataBlock;
char *daqbuffer = (char *) &ixDataBlock;
daq_fe_data_t *zbuffer;
unsigned int loops = 170;
static const int ipcSize = sizeof(struct daq_msg_header_t);
int signOff(int rank,sci_sequence_t sequence,volatile unsigned int *readAddr,volatile unsigned int *writeAddr)
{
/* Lets write CMD_READY the to client, offset "myrank" */
*(writeAddr+SYNC_OFFSET+rank) = 0;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
}
int waitSender(int rank,sci_sequence_t sequence,volatile unsigned int *readAddr,volatile unsigned int *writeAddr)
{
int wait_loops = 0;
int value;
/* Lets write CMD_READY the to client, offset "myrank" */
*(writeAddr+SYNC_OFFSET+rank) = CMD_READY;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
printf("Wait for CMD_READY from master ...\n");
/* Lets wait for the client to send me CMD_READY in offset 0 */
do {
value = (*(readAddr+SYNC_OFFSET));
wait_loops++;
if ((wait_loops % 100000000)==0) {
/* Lets again write CMD_READY to the client, offset "myrank" */
*(writeAddr+SYNC_OFFSET+rank) = CMD_READY;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
}
} while (value != CMD_READY);
printf("Server received CMD_READY\n");
*(writeAddr+SYNC_OFFSET+rank) = 0;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
}
/*********************************************************************************/
/* U S A G E */
/* */
/*********************************************************************************/
void Usage()
{
printf("Usage of reflective:\n");
printf("reflective -client -nodes <nodes>[ -a <adapter no> -size <segment size> ] \n");
printf("reflective -server -rank <rank> [ -a <adapter no> -size <segment size> ] \n\n");
printf(" -client : %s\n", (client) ? "The local node is client" : "The local node is server");
printf(" -a <value> : Local adapter number (default %d)\n", localAdapterNo);
printf(" -size <value> : Segment size (default %d)\n", segmentSize);
printf(" -group <value> : Reflective group identifier (0..5))\n");
printf(" -rank <value> : Rank of server nodes (1,2,3,4,5,6,7, 8,9)\n");
printf(" -nodes <value> : Number of servers.\n");
printf(" -loops <value> : Loops to execute (default %d)\n", loops);
printf(" -help : This helpscreen\n");
printf("\n");
}
sci_error_t ix_rcv_reflective_memory()
{
unsigned int value;
unsigned int written_value = 0;
double average;
timer_start_t timer_start;
int verbose = 1;
int node_offset;
printf("Read = 0x%lx \n Write = 0x%lx \n",(long)readAddr,(long)writeAddr);
/* Perform a barrier operation. The client acts as master. */
waitSender(rank,sequence,readAddr,writeAddr);
printf("\n***********************************************************\n\n");
printf("Loops: %d\n", loops);
writeAddr += 256;
readAddr += 256;
written_value=1;
int ii;
int new_cycle = 0;
int lastCycle = 0;
int msgSize = 0;
do {
int wait_loops = 0;
/* Lets wait for the client to send me a value in offset 0 */
if (verbose) {
printf("Wait for broadcast message...\n");
}
do {
value = (*(readAddr));
wait_loops++;
// if ((wait_loops % 10000000)==0) {
// printf("Value = %d delayed from Client - written_value=%d\n", value,written_value);
// }
} while (value != written_value);
printf("zbuff count = %d\n",zbuffer->header.dcuTotalModels);
printf("zbuff size = %d\n",zbuffer->header.dataBlockSize);
printf("zbuff dcuid = %d\n",zbuffer->header.dcuheader[0].dcuId);
printf("zbuff cycle = %d\n",zbuffer->header.dcuheader[0].cycle);
msgSize = header_size + zbuffer->header.dataBlockSize;
printf("buff size = %d\t%d\t%d\n",msgSize,header_size,ipcSize);
if (verbose) {
printf("Received broadcast value %d \n",value);
}
written_value++;
/* Lets write a value back received value +1 to the client, offset "myrank" */
*(writeAddr+rank) = written_value;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
} while (written_value < loops); /* do this number of loops */
printf("\n***********************************************************\n\n");
/* Lets clean up after demonstrating the use of reflective memory */
return SCI_ERR_OK;
}
/*********************************************************************************/
/* M A I N */
/* */
/*********************************************************************************/
int __CDECL
main(int argc,char *argv[])
{
int counter;
volatile unsigned char *daq_read_addr;
printf("\n %s compiled %s : %s\n\n",argv[0],__DATE__,__TIME__);
if (argc<3) {
Usage();
return(-1);
}
/* Get the parameters */
for (counter=1; counter<argc; counter++) {
if (!strcmp("-rank",argv[counter])) {
/*LINTED*/
rank = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-nodes",argv[counter])) {
/*LINTED*/
nodes = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-group",argv[counter])) {
/*LINTED*/
segmentId = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-loops",argv[counter])) {
loops = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-size",argv[counter])) {
segmentSize = strtol(argv[counter+1],(char **) NULL,10);
if (segmentSize < 4096){
printf("Min segment size is 4 KB\n");
return -1;
}
continue;
}
if (!strcmp("-models",argv[counter])) {
sysname = argv[counter+1];
printf ("sysnames = %s\n",sysname);
continue;
}
if (!strcmp("-a",argv[counter])) {
localAdapterNo = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-rn",argv[counter])) {
continue;
}
if (!strcmp("-client",argv[counter])) {
client = 1;
continue;
}
if (!strcmp("-server",argv[counter])) {
server = 1;
continue;
}
if (!strcmp("-help",argv[counter])) {
Usage();
return(0);
}
}
error = dolphin_init();
printf("Read = 0x%lx \n Write = 0x%lx \n",(long)readAddr,(long)writeAddr);
daq_read_addr = (unsigned char *)readAddr + MY_DAT_OFFSET;
zbuffer = (daq_fe_data_t *)daq_read_addr;
printf("Calling recvr \n");
error = ix_rcv_reflective_memory();
signOff(rank,sequence,readAddr,writeAddr);
error = dolphin_closeout();
return SCI_ERR_OK;
}
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "sisci_types.h"
#include "sisci_api.h"
#include "sisci_error.h"
#include "sisci_demolib.h"
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "testlib.h"
#include <malloc.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
#include <time.h>
#include "../drv/crc.c"
#include "../include/daqmap.h"
#include "../include/drv/fb.h"
#include "../include/daq_core.h"
#define __CDECL
#define DO_HANDSHAKE 0
// #define MY_DCU_OFFSET 0x1a00000
#define MY_DCU_OFFSET 0x00000
#define MY_IPC_OFFSET (MY_DCU_OFFSET + 0x1000)
#define MY_GDS_OFFSET (MY_DCU_OFFSET + 0x9000)
#define MY_DAT_OFFSET (MY_DCU_OFFSET + 0xa000)
#include "./dolphin_common.c"
extern void *findSharedMemorySize(char *,int);
static struct rmIpcStr *shmIpcPtr[128];
static char *shmDataPtr[128];
static struct cdsDaqNetGdsTpNum *shmTpTable[128];
static const int header_size = sizeof(struct daq_fe_header_t);
static const int buf_size = DAQ_DCU_BLOCK_SIZE;
char modelnames[DAQ_TRANSIT_MAX_DCU][64];
char *sysname;
int modelrates[DAQ_TRANSIT_MAX_DCU];
daq_multi_dcu_data_t *ixDataBlock;
static const int mdcu_header_size = sizeof(struct daq_multi_dcu_header_t);
daq_fe_data_t *zbuffer;
unsigned int loops = 170;
static const int ipcSize = sizeof(struct daq_msg_header_t);
unsigned int tstatus[16];
int thread_index[DCU_COUNT];
daq_multi_dcu_data_t mxDataBlockG[32][16];
int stop_working_threads = 0;
int start_acq = 0;
static volatile int keepRunning = 1;
daq_multi_cycle_data_t *mcd;
int waitSender(int rank,sci_sequence_t sequence,volatile unsigned int *readAddr,volatile unsigned int *writeAddr)
{
int wait_loops = 0;
int value;
/* Lets write CMD_READY the to client, offset "myrank" */
*(writeAddr+IX_SYNC_OFFSET+rank) = CMD_READY;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
printf("Wait for CMD_READY from master ...\n");
/* Lets wait for the client to send me CMD_READY in offset 0 */
do {
value = (*(readAddr+IX_SYNC_OFFSET));
wait_loops++;
if ((wait_loops % 100000000)==0) {
/* Lets again write CMD_READY to the client, offset "myrank" */
*(writeAddr+IX_SYNC_OFFSET+rank) = CMD_READY;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
}
} while (value != CMD_READY);
printf("Server received CMD_READY\n");
*(writeAddr+IX_SYNC_OFFSET+rank) = 0;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
}
void intHandler(int dummy) {
keepRunning = 0;
}
static int64_t
s_clock (void)
{
struct timeval tv;
gettimeofday (&tv, NULL);
return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000);
}
/*********************************************************************************/
/* U S A G E */
/* */
/*********************************************************************************/
void Usage()
{
printf("Usage of reflective:\n");
printf("reflective -client -nodes <nodes>[ -a <adapter no> -size <segment size> ] \n");
printf("reflective -server -rank <rank> [ -a <adapter no> -size <segment size> ] \n\n");
printf(" -client : %s\n", (client) ? "The local node is client" : "The local node is server");
printf(" -a <value> : Local adapter number (default %d)\n", localAdapterNo);
printf(" -size <value> : Segment size (default %d)\n", segmentSize);
printf(" -group <value> : Reflective group identifier (0..5))\n");
printf(" -rank <value> : Rank of server nodes (1,2,3,4,5,6,7, 8,9)\n");
printf(" -nodes <value> : Number of servers.\n");
printf(" -loops <value> : Loops to execute (default %d)\n", loops);
printf(" -help : This helpscreen\n");
printf("\n");
}
void *ix_rcvr_thread(void *arg)
{
unsigned int value;
unsigned int written_value = 0;
double average;
int verbose = 1;
int node_offset;
int *mythread = (int *)arg;
int mt = *mythread;
int ii;
int cycle = 0;
int lastCycle = 0;
int msgSize = 0;
volatile unsigned char *daq_read_addr;
daq_fe_data_t *ixbuffer;
volatile unsigned int *myreadAddr;
volatile unsigned int *mywriteAddr;
myreadAddr = (unsigned int *)readAddr;
mywriteAddr = (unsigned int *)writeAddr;
daq_read_addr = (unsigned char *)readAddr + MY_DAT_OFFSET;
ixbuffer = (daq_fe_data_t *)daq_read_addr;
ixbuffer += mt;
printf("Read = 0x%lx \n",(long)ixbuffer);
/* Perform a barrier operation. The client acts as master. */
waitSender(rank,sequence,readAddr,writeAddr);
printf("\n***********************************************************\n\n");
printf("Loops: %d\n", loops);
mywriteAddr += 256;
myreadAddr += 256;
written_value=1;
do {
int wait_loops = 0;
/* Lets wait for the client to send me a value in offset 0 */
if (!verbose) {
printf("Wait for broadcast message...\n");
}
do {
value = (*(myreadAddr));
wait_loops++;
// if ((wait_loops % 10000000)==0) {
// printf("Value = %d delayed from Client - written_value=%d\n", value,written_value);
// }
} while (value != written_value);
cycle = ixbuffer->header.dcuheader[0].cycle;
msgSize = header_size + ixbuffer->header.dataBlockSize;
char *localbuff = (char *)&mxDataBlockG[mt][cycle];
memcpy(localbuff,ixbuffer,msgSize);
#if 0
printf("zbuff count = %d\n",ixbuffer->header.dcuTotalModels);
printf("zbuff size = %d\n",ixbuffer->header.dataBlockSize);
printf("zbuff cycle = %d\n",cycle);
printf("zbuff dcuid = %d\n",ixbuffer->header.dcuheader[0].dcuId);
printf("buff size = %d\t%d\t%d\n",msgSize,header_size,ipcSize);
#endif
if (!verbose) {
printf("Received broadcast value %d \n",cycle);
}
tstatus[cycle] |= (1 << mt);
written_value++;
/* Lets write a value back received value +1 to the client, offset "myrank" */
*(mywriteAddr+rank) = written_value;
SCIFlush(sequence,SCI_FLAG_FLUSH_CPU_BUFFERS_ONLY);
} while (!stop_working_threads); /* do this number of loops */
printf("Stopping thread %d\n",mt);
usleep(200000);
return 0;
}
/*********************************************************************************/
/* M A I N */
/* */
/*********************************************************************************/
int __CDECL
main(int argc,char *argv[])
{
int counter;
volatile unsigned char *daq_read_addr;
pthread_t thread_id[4];
int ii,jj;
int dataRdy = 0;
int loop = 0;
daq_msg_header_t *sendheader;
daq_msg_header_t *rcvheader;
char *senddata;
char *rcvdata;
int myc;
char *mcdDataPtr;
printf("\n %s compiled %s : %s\n\n",argv[0],__DATE__,__TIME__);
printf("Size of mcd = %d\n",sizeof(mcd));
if (argc<3) {
Usage();
return(-1);
}
/* Get the parameters */
for (counter=1; counter<argc; counter++) {
if (!strcmp("-rank",argv[counter])) {
/*LINTED*/
rank = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-nodes",argv[counter])) {
/*LINTED*/
nodes = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-group",argv[counter])) {
/*LINTED*/
segmentId = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-size",argv[counter])) {
segmentSize = strtol(argv[counter+1],(char **) NULL,10);
if (segmentSize < 4096){
printf("Min segment size is 4 KB\n");
return -1;
}
continue;
}
if (!strcmp("-a",argv[counter])) {
localAdapterNo = strtol(argv[counter+1],(char **) NULL,10);
continue;
}
if (!strcmp("-help",argv[counter])) {
Usage();
return(0);
}
}
signal(SIGINT,intHandler);
error = dolphin_init();
printf("Read = 0x%lx \n Write = 0x%lx \n",(long)readAddr,(long)writeAddr);
char *ifo = (char *)findSharedMemorySize("ifo",100);
daq_multi_cycle_header_t *ifo_header = (daq_multi_cycle_header_t *)ifo;
char *ifo_data = (char *)ifo + sizeof(daq_multi_cycle_header_t);
printf("Starting recvr threads\n");
for(ii=0;ii<nodes;ii++)
{
thread_index[ii] = ii;
pthread_create(&thread_id[ii],NULL,ix_rcvr_thread,(void *)&thread_index);
dataRdy |= (1 << ii);
}
int timeout = 0;
int resync = 1;
int64_t mytime = 0;
int64_t mylasttime = 0;
int64_t myptime = 0;
int mytotaldcu = 0;
do {
if(resync) {
loop = 0;
do {
usleep(2000);
timeout += 1;
}while(tstatus[loop] == 0 && timeout < 5000);
for(ii=0;ii<16;ii++) tstatus[ii] = 0;
printf("RESYNC ***************** \n");
}
// Wait until received data from at least 1 FE
timeout = 0;
do {
usleep(2000);
timeout += 1;
}while(tstatus[loop] == 0 && timeout < 5000 && !resync);
resync = 0;
// If timeout, not getting data from anyone.
if(timeout >= 5000) resync = 1;
if (resync) continue;
// Wait until data received from everyone
timeout = 0;
do {
usleep(1000);
timeout += 1;
}while(tstatus[loop] != dataRdy && timeout < 5);
// If timeout, not getting data from everyone.
// TODO: MARK MISSING FE DATA AS BAD
// Clear thread rdy for this cycle
tstatus[loop] = 0;
// Timing diagnostics
mytime = s_clock();
myptime = mytime - mylasttime;
mylasttime = mytime;
if(loop == 0) {
printf("Data rdy for cycle = %d\t%ld\n",mxDataBlockG[0][loop].header.dcuheader[0].timeSec,myptime);
printf("\tdatasize = %d\n",mxDataBlockG[0][loop].header.dataBlockSize);
}
// Reset total DCU counter
mytotaldcu = 0;
// daq_multi_cycle_header_t *ifo_header = (daq_multi_cycle_header_t header *)ifo;
// char *ifo_data = (char *)ifo + sizeof(daq_multi_cycle_header_t);
mcdDataPtr = ifo_data;
mcdDataPtr += loop * DAQ_TRANSIT_DC_DATA_BLOCK_SIZE;
ixDataBlock = (daq_multi_dcu_data_t *)mcdDataPtr;
sendheader = (daq_msg_header_t *) &ixDataBlock->header.dcuheader[0];
senddata = (char *) &ixDataBlock->dataBlock[0];
int sendDataBlockSize = 0;
// Loop over all data buffers received from FE computers
for(ii=0;ii<nodes;ii++) {
// for(jj=0;jj<2;jj++) {
// Get pointers to receiver header and data blocks
rcvheader = (daq_msg_header_t *) &mxDataBlockG[ii][loop].header.dcuheader[0];
rcvdata = (char *) &mxDataBlockG[ii][loop].dataBlock[0];
// Get the receive number of dcu
myc = mxDataBlockG[ii][loop].header.dcuTotalModels;
// Add rcv dcu count to my total send count
mytotaldcu += myc;
// Calc size of receive header data to move
int headersize = myc * ipcSize;
// Copy rcv header area to send header
memcpy(sendheader,rcvheader,headersize);
// Increment send header for next dcu
sendheader += myc;
// Get the size of received data to move
sendDataBlockSize += mxDataBlockG[ii][loop].header.dataBlockSize;
// Copy rcv data to send data block
memcpy(senddata,rcvdata,sendDataBlockSize);
// Update the send header with dcu and size info
ixDataBlock->header.dcuTotalModels = mytotaldcu;
ixDataBlock->header.dataBlockSize = sendDataBlockSize;
// }
}
ifo_header->curCycle = loop;
ifo_header->cycleDataSize = DAQ_TRANSIT_DC_DATA_BLOCK_SIZE;
ifo_header->maxCycle = 16;
#if 0
if(loop == 0) {
printf("New Send Data:\n");
printf("\tNum of DCU: %d\n",ixDataBlock->header.dcuTotalModels);
printf("\tDataSize: %d\n",ixDataBlock->header.dataBlockSize);
for(ii=0;ii<ixDataBlock->header.dcuTotalModels;ii++)
{
printf("\tDCUID %d = %d\n",ii,ixDataBlock->header.dcuheader[ii].cycle);
}
}
#endif
loop ++;
loop %= 16;
}while (keepRunning);
printf("stopping threads %d \n",nodes);
stop_working_threads = 1;
// Wait for threads to stop
sleep(2);
error = dolphin_closeout();
return SCI_ERR_OK;
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -63,8 +63,8 @@ int daqStatBit[ 2 ];
void
Usage( )
{
fprintf( stderr, "Usage of mx_fe:\n" );
fprintf( stderr, "mx_fe -s <models> <OPTIONS>\n" );
fprintf( stderr, "Usage of local_dc:\n" );
fprintf( stderr, "local_dc -s <models> <OPTIONS>\n" );
fprintf( stderr,
" -b <buffer> : Name of the mbuf to concentrate the data to "
"locally (defaults to ifo)\n" );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment