Skip to content
Snippets Groups Projects
Commit e9bb2436 authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

checkin of partial work on a zmq producer

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4416 6dcd42c9-f523-4c6d-aada-af552506706e
parent 52f5303c
No related branches found
No related tags found
No related merge requests found
......@@ -53,6 +53,7 @@ using namespace std;
#include "work_queue.hh"
#include <zmq.hpp>
#include "../zmq_stream/zmq_daq.h"
extern daqd_c daqd;
extern int shutdown_server();
......@@ -85,6 +86,28 @@ namespace {
const int DEBUG_THREAD_OUTPUT = 2;
const int CRC_THREAD_INPUT = 2;
const int CRC_THREAD_OUTPUT = 0;
int receive_zmq_broadcast(zmq::socket_t &socket, void *dest, size_t dest_size, int *seq, int *gps, int *gps_n)
{
if (!dest || dest_size < sizeof(daq_multi_dcu_data_t)) return -sizeof(daq_multi_dcu_data_t);
zmq::message_t msg;
while (!socket.recv(&msg)) {}
if (msg.size() > dest_size)
return -msg.size();
::memcpy(dest, msg.data(), msg.size());
daq_multi_dcu_data_t *tmp = reinterpret_cast<daq_multi_dcu_data_t*>(dest);
if (tmp->dcuTotalModels > 0) {
if (seq) *seq = tmp->zmqheader[0].cycle;
if (gps) *gps = tmp->zmqheader[0].timeSec;
if (gps_n) *gps_n = tmp->zmqheader[0].timeNSec;
} else {
if (seq) *seq = 0;
if (gps) *gps = 0;
if (gps_n) *gps_n = 0;
}
return copy_bytes;
}
}
......@@ -202,7 +225,7 @@ void *producer::frame_writer() {
char *bufptr = (char *)(cur_buffer->move_buf) - BROADCAST_HEADER_SIZE;
// Wait until start of a second
while (gps_n) {
int length = NDS->receive(bufptr, buflen, &seq, &gps, &gps_n);
int length = receive_zmq_broadcast(receiver, bufptr, buflen, &seq, &gps, &gps_n);
if (length < 0) {
printf("Allocated buffer too small; required %d, size %d\n",
-length, buflen);
......@@ -268,7 +291,7 @@ void *producer::frame_writer() {
bufptr = (char *)(cur_buffer->move_buf) - BROADCAST_HEADER_SIZE;
for (;;) {
int old_seq = seq;
int length = NDS->receive(bufptr, buflen, &seq, &gps, &gps_n);
int length = receive_zmq_broadcast(receiver, bufptr, buflen, &seq, &gps, &gps_n);
cur_buffer->length = length;
// DEBUG1(printf("%d %d %d %d\n", length, seq, gps, gps_n));
// Strangely we receiver duplicate blocks on solaris for some reason
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment