Skip to content
Snippets Groups Projects
Commit 13c91952 authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Updates to producer_dc_zmq.cc which allow data to flow properly.

Fixes to gps handling.

Use the data stream as the clock, the zmq data transfer has more jitter than the mx, so sometimes transfers go long, sometimes short, allow for that.

This version is the first that could be arrange the data well enough for frames to be written.

Still needs some correctness testing.  Lots of printf/cout debugging code will need to be removed.

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4550 6dcd42c9-f523-4c6d-aada-af552506706e
parent 60a8436a
No related branches found
No related tags found
No related merge requests found
......@@ -60,6 +60,7 @@ using namespace std;
#include "raii.hh"
#include "conv.hh"
#include "circ.h"
extern daqd_c daqd;
extern int shutdown_server();
......@@ -142,7 +143,7 @@ void *producer::frame_writer() {
int cycle_delay = daqd.cycle_delay;
// Wait until a second boundary
{
/*{
if ((daqd.dcu_status_check & 4) == 0) {
if (daqd.symm_ok() == 0) {
......@@ -172,9 +173,34 @@ void *producer::frame_writer() {
prev_gps, frac, f);
controller_cycle = 1;
}
}
}*/
zmq_receiver.begin_acquiring();
// Wait until a second ends, so that the next data sould
// come in on cycle 0
// use the data as the clock here
int sync_tries = 0;
while (true) {
const int max_sync_tries = 10 * DATA_BLOCKS;
zmq_dc::data_block block = zmq_receiver.receive_data();
++sync_tries;
if (block.full_data_block->dcuTotalModels == 0)
continue;
gps = block.full_data_block->zmqheader[0].timeSec;
frac = block.full_data_block->zmqheader[0].timeNSec;
// as of 8 Nov 2017 zmq_multi_stream sends the gps nanoseconds as a cycle number
if (frac == DATA_BLOCKS-1 || frac >= 937500000)
break;
if (sync_tries > max_sync_tries) {
std::cerr << "Unable to sync up to front ends after " << sync_tries << " attempts" << std::endl;
exit(1);
}
}
prev_gps = gps;
prev_frac = frac;
PV::set_pv(PV::PV_UPTIME_SECONDS, 0);
PV::set_pv(PV::PV_GPS, 0);
......@@ -197,6 +223,23 @@ void *producer::frame_writer() {
// retreive 1/16s of data from zmq
zmq_dc::data_block zmq_data_block = zmq_receiver.receive_data();
std::cout << "#" << std::endl;
if (zmq_data_block.full_data_block->dcuTotalModels > 0) {
gps = zmq_data_block.full_data_block->zmqheader[0].timeSec;
frac = zmq_data_block.full_data_block->zmqheader[0].timeNSec;
bool new_sec = (i % 16) == 0;
bool is_good = false;
if (new_sec) {
is_good = (gps == prev_gps + 1 && frac == 0);
} else {
const unsigned int step = 1000000000/16;
is_good = (gps == prev_gps && ((frac == prev_frac + 1) || (frac == prev_frac + step)));
}
if (!is_good) {
std::cerr << "###################################\n\n\nGlitch in receive\n"
<< "prev " << prev_gps << ":" << prev_frac << " cur " << gps << ":" << frac << std::endl;
}
}
// map out the order of the dcuids in the zmq data, this could change
// with each data block
......@@ -271,6 +314,10 @@ void *producer::frame_writer() {
<< dcuCycleStatus[ifo][j]
<< " dcuStatCycle="
<< dcuStatCycle[ifo][j] << endl);
std::cout << "dcuid=" << j << " dcuCycleStatus="
<< dcuCycleStatus[ifo][j]
<< " dcuStatCycle="
<< dcuStatCycle[ifo][j] << endl;
/* Check if DCU running and in sync */
if ((dcuCycleStatus[ifo][j] > 3 || j < 5) &&
......@@ -285,6 +332,11 @@ void *producer::frame_writer() {
<< "); status "
<< dcuCycleStatus[ifo][j]
<< dcuStatCycle[ifo][j] << endl);
std::cout << "Lost " << daqd.dcuName[j]
<< "(ifo " << ifo << "; dcu " << j
<< "); status "
<< dcuCycleStatus[ifo][j]
<< dcuStatCycle[ifo][j] << endl;
cur_dcu.status = DAQ_STATE_FAULT;
}
......@@ -310,7 +362,8 @@ void *producer::frame_writer() {
// Update DCU status
int newStatus = cur_dcu.status != DAQ_STATE_RUN ? 0xbad : 0;
std::cout << "newStatus = " << (hex) << newStatus << " cur_dcu.status = " << (dec) << cur_dcu.status << std::endl;
std::cout << "newStatus = " << (hex) << newStatus << " cur_dcu.status = " << (dec) << cur_dcu.status;
std::cout << " gps = " << cur_dcu.timeSec << " gps_n = " << cur_dcu.timeNSec << std::endl;
int newCrc = cur_dcu.fileCrc;
......@@ -366,8 +419,8 @@ void *producer::frame_writer() {
// %d\n", j, cblk, gmDaqIpc[j].bp[cblk].cycle, dcu_gps,
// gps);
unsigned long mygps = gps;
if (cblk > (15 - cycle_delay))
mygps--;
//if (cblk > (15 - cycle_delay))
// mygps--;
if (daqd.edcuFileStatus[j]) {
daqd.dcuStatus[0][j] |= 0x8000;
......@@ -471,17 +524,37 @@ void *producer::frame_writer() {
// prop.gps = time(0) - 315964819 + 33;
prop.gps = gps;
if (cblk > (15 - cycle_delay))
prop.gps--;
//if (cblk > (15 - cycle_delay))
// prop.gps--;
prop.gps_n = 1000000000 / 16 * (i % 16);
// printf("before put %d %d %d\n", prop.gps, prop.gps_n, frac);
prop.leap_seconds = daqd.gps_leap_seconds(prop.gps);
std::cout << "about to call put16th_dpscattered with " << vmic_pv_len << " entries. prop.gps = " << prop.gps << " prop.gps_n = " << prop.gps_n << "\n";
//for (int ii = 0; ii < vmic_pv_len; ++ii)
// std::cout << " " << *vmic_pv[ii].src_status_addr;
//std::cout << std::endl;
stat_transfer.sample();
int nbi = daqd.b1->put16th_dpscattered(vmic_pv, vmic_pv_len, &prop);
stat_transfer.tick();
{
circ_buffer_block_t* block_p = daqd.b1->block_prop(nbi);
std::cout << "block_p->prop.gps = " << block_p->prop.gps << " block_p->prop.gps_n = " << block_p->prop.gps_n << std::endl;
//if (block_p->prop.gps != prop.gps) {
// std::cout << "\n\nblock_p->prop.gps (" << block_p->prop.gps << ") != prop.gps (" << prop.gps << ")\n" << std::endl;
//}
//assert(block_p->prop.gps == prop.gps);
}
std::cout << "put16th_dpscattered returned " << nbi << std::endl;
std::cout << "drops: " << daqd.b1->drops() << " blocks: " << daqd.b1->blocks() << " puts: " << daqd.b1->num_puts()
<< " consumers: " << daqd.b1->get_cons_num() << std::endl;
// printf("%d %d\n", prop.gps, prop.gps_n);
// DEBUG1(cerr << "producer " << i << endl);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment