diff --git a/src/ix_stream/mx2ix.c b/src/ix_stream/mx2ix.c index 6406c74e2f9d452945f422c596ee9fe985b6e7d9..9b5e84a9f1cc8ac75607441939301c0c326e53db 100644 --- a/src/ix_stream/mx2ix.c +++ b/src/ix_stream/mx2ix.c @@ -72,6 +72,7 @@ MX_MUTEX_T stream_mutex; #define MATCH_VAL_THREAD 1 #define THREADS_PER_NIC 16 #define IX_STOP_SEC 5 + static int xmitDataOffset[IX_BLOCK_COUNT]; daq_multi_cycle_header_t *xmitHeader[IX_BLOCK_COUNT]; @@ -459,7 +460,6 @@ main(int argc, char **argv) int missed_nsys[MAX_FE_COMPUTERS]; int64_t recv_time[MAX_FE_COMPUTERS]; int64_t min_recv_time = 0; - int64_t cur_ref_time = 0; int recv_buckets[(MAX_DELAY_MS/5)+2]; int festatus = 0; int pv_festatus = 0; @@ -588,6 +588,9 @@ main(int argc, char **argv) } timeout += 1; }while(!any_rdy && timeout < 50); +#ifndef TIME_INTERVAL_DIAG + mytime = s_clock(); +#endif // Wait up to delay_ms ms in 1/10ms intervals until data received from everyone or timeout timeout = 0; @@ -600,13 +603,22 @@ main(int argc, char **argv) timeout += 1; }while(threads_rdy < nsys && timeout < delay_cycles); if(timeout >= 100) rcv_errors += (nsys - threads_rdy); +#ifndef TIME_INTERVAL_DIAG + mylasttime = s_clock(); +#endif if(any_rdy) { int tbsize = 0; - // Timing diagnostics +#ifdef TIME_INTERVAL_DIAG + // Timing diagnostics for time between cycles mytime = s_clock(); myptime = mytime - mylasttime; mylasttime = mytime; +#else + // Timing diagnostics for rcv window + myptime = mylasttime - mytime; +#endif + if (myptime < min_cycle_time) { min_cycle_time = myptime; } @@ -722,7 +734,6 @@ main(int argc, char **argv) // Calc IX message size sendLength = header_size + ifoDataBlock->header.fullDataBlockSize; for (ii = 0; ii < nsys; ++ii) { - cur_ref_time = 0; recv_time[ii] -= min_recv_time; } datablock_size_running += dc_datablock_size; diff --git a/src/mx_stream/mx_fe.c b/src/mx_stream/mx_fe.c index 2f1f5fc329e0fcae971c274ddda142773e273999..b9f0af6fe103d9d0ac09b80901ec89d8e52386fa 100644 --- a/src/mx_stream/mx_fe.c +++ b/src/mx_stream/mx_fe.c @@ -79,6 +79,7 @@ size_t cycle_data_size; char msg_buffer[MSG_BUF_SIZE]; int symmetricom_fd = -1; +int daqStatBit[2]; /*********************************************************************************/ @@ -278,9 +279,6 @@ int loadMessageBuffer( int nsys, { int sendLength = 0; int ii; - int daqStatBit[2]; - daqStatBit[0] = 4; - daqStatBit[1] = 8; int dataXferSize; char *dataBuff; int myCrc = 0; @@ -368,9 +366,6 @@ int send_to_local_memory(int nsys, uint16_t my_dcu) { int do_wait = 1; - int daqStatBit[2]; - daqStatBit[0] = 1; - daqStatBit[1] = 2; char *nextData; @@ -460,10 +455,16 @@ int send_to_local_memory(int nsys, myErrorSignal = 1; break; } - mx_wait(ep, &req[cur_req], 150, &stat, &result); - if (!result) { - fprintf(stderr, "mxWait failed with status %s\n", mx_strstatus(stat.code)); - myErrorSignal = 1; +again: + res = mx_wait(ep, &req[cur_req], 50, &stat, &result); + if (res != MX_SUCCESS) { + fprintf(stderr, "mx_cancel() failed with status %s\n", mx_strerror(res)); + exit(1); + } + if (result == 0) { + fprintf(stderr, "trying again \n"); + goto again; + // myErrorSignal = 1; } if (stat.code != MX_STATUS_SUCCESS) { fprintf(stderr, "isendxxx failed with status %s\n", mx_strstatus(stat.code)); @@ -522,8 +523,6 @@ main(int argc,char *argv[]) extern char *optarg; mx_return_t ret; - mx_init(); - MX_MUTEX_INIT(&stream_mutex); rem_host = NULL; sysname = NULL; @@ -608,13 +607,24 @@ main(int argc,char *argv[]) // If sending to DAQ via net enabled, ensure all options have been set if (sendViaOmx) { + mx_init(); + MX_MUTEX_INIT(&stream_mutex); if(my_eid == DFLT_EID || his_eid == DFLT_EID){ fprintf(stderr, "\n***ERROR\n***Must set both -e and -r options to send data to DAQ\n\n"); Usage(); return(0); } fprintf(stderr,"Writing DAQ data to local shared memory and sending out on Open-MX\n"); + if(my_eid == 0) { + daqStatBit[0] = 1; + daqStatBit[1] = 2; + } else { + daqStatBit[0] = 4; + daqStatBit[1] = 8; + } } else { + daqStatBit[0] = 1; + daqStatBit[1] = 2; fprintf(stderr,"Writing DAQ data to local shared memory only \n"); }