Skip to content
Snippets Groups Projects
Commit 107cf9c2 authored by Erik von Reis's avatar Erik von Reis
Browse files

Merge branch 'daqd_start_writer_refactor' into 'master'

Refactoring of the daqd/comm.y code to move the start (net/file/...) writer...

See merge request cds/advligorts!211
parents a39e3436 ec3303f1
No related branches found
No related tags found
1 merge request!211Refactoring of the daqd/comm.y code to move the start (net/file/...) writer...
......@@ -2279,442 +2279,7 @@ start_trend_bailout:
/* IP address:port gps_seconds_start period_seconds */
| START WriterType OptionalTextExpression OptionalIntnum OptionalIntnum BroadcastOption DecimateOption {((my_lexer *) lexer) -> trend_channels = 0; ((my_lexer *) lexer) -> num_channels = 0; ((my_lexer *)lexer) -> error_code = 0;((my_lexer *) lexer) -> n_archive_channel_names = 0;} ChannelNames {
int no_data_connection;
ostream *yyout = ((my_lexer *)lexer)->get_yyout ();
int ifd = ((my_lexer *)lexer)->get_ifd ();
// Check if no main buffer
if (! daqd.b1) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_NO_MAIN << flush;
else
*yyout << "Main is not started" << endl;
} else if (daqd.offline_disabled && ($4 || $5)) { // Check if off-line data request is disabled
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_NO_OFFLINE << flush;
else
*yyout << "Off-line data request disabled" << endl;
} else
// Check errors in channel names here
if (((my_lexer *)lexer) -> error_code) {
if (((my_lexer *)lexer) -> strict)
*yyout << setw(4) << setfill ('0') << hex << ((my_lexer *) lexer) -> error_code << dec << flush;
else
*yyout << "Network writer is not started" << endl;
goto start_writer_bailout;
} else {
void *mptr = malloc (sizeof(net_writer_c));
if (!mptr) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_MALLOC << flush;
else
*yyout << "Virtual memory exhausted" << endl;
goto start_writer_bailout;
}
DEBUG1(cerr << "Creating net_writer tagged " << ifd << endl);
net_writer_c *nw = new (mptr) net_writer_c(ifd);
if (!nw -> alloc_vars($9? MAX_CHANNELS: my_lexer::max_channels)) {
if (((my_lexer *)lexer) -> strict) *yyout << S_DAQD_MALLOC << flush;
else *yyout << "Virtual memory exhausted" << endl;
nw->~net_writer_c ();
free ((void *) nw);
goto start_writer_bailout;
}
int errc;
#ifndef NO_BROADCAST
nw -> broadcast = (long) $6;
nw -> set_mcast_interface( $6 );
#endif
if ((unsigned long) $6 > 1)
free ($6);
nw -> no_averaging = ( $7 == DOWNSAMPLE );
system_log(1, "no_average=%d\n", nw -> no_averaging);
if (($2) == NAME_WRITER) {
// Bail out if frame saver is not running, no point to start it hanging around, i guess (???)
// Bail out on a request with any channel specs (only `all' is valid)
// Bail out on a request with any periods
// Bail out if there is no circular buffer, ie, if !daqd.fsd.cb
if (! daqd.frame_saver_tid) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_NO_OFFLINE << flush;
else
*yyout << "name writer is not started, since frame saver is not running" << endl;
}
if (($4) || ($5) || ! ($9)) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_ERROR << flush;
else
*yyout << "invalid command" << endl;
}
//FIXME
// Should I try to construct this circular buffer on the first request?
if (! daqd.fsd.cb) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_NOT_SUPPORTED << flush;
else
*yyout << "main filesys map doesn't have a circular buffer constructed" << endl;
}
if (! daqd.frame_saver_tid || ($4) || ($5) || !($9)|| !daqd.fsd.cb) {
nw -> ~net_writer_c ();
free ((void *) nw);
goto start_writer_bailout;
}
nw -> writer_type = net_writer_c::name_writer;
} else if (($2) == FRAME_WRITER) {
int not_supported = ($4) ^ ($5)
|| (($4) && ($5) && ! ($9)); // only format `start frame-writer 123123 10 all' supported
// Decimation is not supported
for (int i = 0; i < ((my_lexer *) lexer) -> num_channels; i++)
not_supported |= ((my_lexer *) lexer) -> channels [i].req_rate;
if (not_supported) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_NOT_SUPPORTED << flush;
else
*yyout << "frame write call format is not supported" << endl;
nw -> ~net_writer_c ();
free ((void *) nw);
goto start_writer_bailout;
}
nw -> writer_type = net_writer_c::frame_writer;
} else if (($2) == FAST_WRITER) {
if (($4) || ($5) ) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_ERROR << flush;
else
*yyout << "invalid command" << endl;
nw -> ~net_writer_c ();
free ((void *) nw);
goto start_writer_bailout;
}
nw -> writer_type = net_writer_c::fast_writer;
} else {
nw -> writer_type = net_writer_c::slow_writer;
}
// $3 is the string in the format `127.0.0.1:9090'
// If the string is not present -- use control connection for data transport
// If there is no IP address, usethe port and extract IP address
// from the request and open data connection to this address using specified port.
if ($3) {
long ip_addr = -1;
if (! daqd_c::is_valid_dec_number ($3))
ip_addr = net_writer_c::ip_str ($3);
// Extract IP address
if (ip_addr == -1) {
if ((ip_addr = net_writer_c::ip_fd (((my_lexer *)lexer) -> ifd)) == -1) {
if (((my_lexer *)lexer) -> strict)
*yyout << setw(4) << setfill ('0') << hex << DAQD_GETPEERNAME << dec << flush;
else {
*yyout << "Couldn't do getpeername(); errno=" << errno << endl;
}
nw -> ~net_writer_c ();
free ((void *) nw);
goto start_writer_bailout;
}
}
nw -> srvr_addr.sin_addr.s_addr = (u_long) ip_addr;
nw -> srvr_addr.sin_family = AF_INET;
nw -> srvr_addr.sin_port = net_writer_c::port_str ($3);
no_data_connection = 0;
} else
no_data_connection = 1;
if (nw -> writer_type == net_writer_c::name_writer) {
DEBUG1(cerr << "name writer on the frame files" << endl);
nw -> transmission_block_size = nw -> block_size = daqd.fsd.cb -> block_size ();
nw -> num_channels = 0;
nw -> pvec [0].vec_idx = 0;
nw -> pvec [0].vec_len = daqd.fsd.cb -> block_size ();
nw -> pvec_len = 1;
} else {
int fast_net_writer = nw -> writer_type == net_writer_c::fast_writer;
if ($9) {
DEBUG1(cerr << "all channels selected" << endl);
#if 0
memcpy (nw -> channels, daqd.channels,
sizeof (nw -> channels [0]) * daqd.num_channels);
#endif // 0
nw -> num_channels = daqd.num_channels
#ifdef GDS_TESTPOINTS
// Do not send gds channels to broadcaster for now
- daqd.num_gds_channels - daqd.num_gds_channel_aliases
#endif
;
unsigned int j = 0;
for (unsigned int i=0; i < daqd.num_channels; i++) {
#if 0
if (nw -> broadcast) {
if (daqd.channels [i].sample_rate > 16384
|| daqd.channels [i].data_type == _32bit_complex) {
nw -> num_channels--;
continue;
}
}
#endif
#ifdef GDS_TESTPOINTS
if ((!IS_GDS_ALIAS (daqd.channels [i]))
&& (!IS_GDS_SIGNAL (daqd.channels [i])))
#endif
nw -> channels [j++] = daqd.channels [i];
}
} else {
memcpy (nw -> channels, ((my_lexer *) lexer) -> channels, sizeof (nw -> channels [0]) * ((my_lexer *) lexer) -> num_channels);
nw -> num_channels = ((my_lexer *) lexer) -> num_channels;
}
nw -> block_size = 0;
DEBUG1(cerr << "num_channels=" << nw -> num_channels << endl);
if (fast_net_writer) {
// Check if any channel rate is less than 16
for (int i = 0; i < nw -> num_channels; i++)
if (nw -> channels -> req_rate && nw -> channels -> req_rate < 16) {
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_INVALID_CHANNEL_DATA_RATE << flush;
else
*yyout << "invalid channel data rate -- minimum rate for fast net-writer is 16" << endl;
nw -> ~net_writer_c ();
free ((void *) nw);
goto start_writer_bailout;
}
}
#ifdef GDS_TESTPOINTS
// See if there are any GDS alias channels
// If there are any, need to find real GDS
// channels carrying the data, then update
// the offset
{
int na = 0;
//char *alias[nw -> num_channels];
//unsigned int tpnum[nw -> num_channels];
long_channel_t *ac [nw -> num_channels];
channel_t *gds [nw -> num_channels];
for (int i = 0; i < nw -> num_channels; i++) {
//printf("channel %d tp_node=%d\n", i, nw -> channels [i].tp_node);
if (IS_GDS_ALIAS(nw -> channels [i])) {
ac [na] = nw -> channels + i;
//tpnum [na] = nw -> channels [i].chNum;
//alias [na] = nw -> channels [i].name;
na++;
}
}
if (na) {
int res = -1;
//res = daqd.gds.req_names (alias, tpnum, gds,na);
if (daqd.tp_allowed(net_writer_c::ip_fd(((my_lexer *)lexer) -> ifd)))
res = daqd.gds.req_tps(ac, gds, na);
if (res) {
//if (daqd.tp_allowed(net_writer_c::ip_fd(((my_lexer *)lexer) -> ifd)))
//daqd.gds.clear_tps(ac, na);
if (((my_lexer *)lexer) -> strict)
*yyout << S_DAQD_NOT_FOUND << flush;
else
*yyout << "failed to find GDS signal for one or more aliases" << endl;
nw -> ~net_writer_c ();
free ((void *) nw);
goto start_writer_bailout;
}
}
nw -> clear_testpoints = 1;
}
#endif // #ifdef GDS_TESTPOINTS
for (int i = 0; i < nw -> num_channels; i++) {
if (fast_net_writer) {
if (nw -> channels [i].req_rate)
nw -> transmission_block_size += nw -> channels [i].bps * nw -> channels [i].req_rate/16;
else
nw -> transmission_block_size += nw -> channels [i].bytes/16;
nw -> block_size += nw -> channels [i].bytes / 16;
for (int j = 0; j < 16; j++) {
nw -> pvec16th [j][nw -> pvec_len].vec_idx
= nw -> channels [i].offset + nw -> channels [i].bytes / 16 * j;
nw -> pvec16th [j][nw -> pvec_len].vec_len = nw -> channels [i].bytes / 16;
}
nw -> pvec_len++;
// Contruct another scattered data array to be used for data decimation
// in the net_writer_c::consumer()
// `dec_vec [].vec_idx' is the offset in the net-writer circ buffer data block
if (!nw -> dec_vec_len) { // Start the vector
nw -> dec_vec [0].vec_idx = 0;
nw -> dec_vec [0].vec_len = nw -> channels [i].bytes/16;
nw -> dec_vec [0].vec_rate = nw -> channels [i].req_rate/16;
nw -> dec_vec [0].vec_bps = nw -> channels [i].bps;
nw -> dec_vec_len++;
} else if (nw -> channels [i -1].req_rate == nw -> channels [i].req_rate
&& nw -> channels [i -1].bytes == nw -> channels [i].bytes
&& nw -> channels [i -1].bps == nw -> channels [i].bps) {
// Increase data element length for contiguous channels with the same rate, same requested rate
// and the same number bytes per sample.
// We should be able to treat these channels as one channel with the
// increased data rate.
nw -> dec_vec [nw -> dec_vec_len -1].vec_len += nw -> channels [i].bytes/16;
nw -> dec_vec [nw -> dec_vec_len -1].vec_rate += nw -> channels [i].req_rate/16;
} else {
// Start new vector if rate or BPS changes
nw -> dec_vec [nw -> dec_vec_len].vec_idx
= nw -> dec_vec [nw -> dec_vec_len -1].vec_idx
+ nw -> dec_vec [nw -> dec_vec_len -1].vec_len;
nw -> dec_vec [nw -> dec_vec_len].vec_len = nw -> channels [i].bytes/16;
nw -> dec_vec [nw -> dec_vec_len].vec_rate = nw -> channels [i].req_rate/16;
nw -> dec_vec [nw -> dec_vec_len].vec_bps = nw -> channels [i].bps;
nw -> dec_vec_len++;
}
} else {
if (nw -> channels [i].req_rate)
nw -> transmission_block_size += nw -> channels [i].bps * nw -> channels [i].req_rate;
else
nw -> transmission_block_size += nw -> channels [i].bytes;
nw -> block_size += nw -> channels [i].bytes;
// Construct put vector array
if (!nw -> pvec_len) { // Start vector
nw -> pvec [0].vec_idx = nw -> channels [i].offset;
nw -> pvec [0].vec_len = nw -> channels [i].bytes;
nw -> pvec_len++;
} else if (nw -> pvec [nw -> pvec_len -1].vec_idx + nw -> pvec [nw -> pvec_len -1].vec_len
== nw -> channels [i].offset) {
// Increase vector element length for contiguous channels
nw -> pvec [nw -> pvec_len -1].vec_len += nw -> channels [i].bytes;
} else { // Start new vector if there is a gap in channel sequence
nw -> pvec [nw -> pvec_len].vec_idx = nw -> channels [i].offset;
nw -> pvec [nw -> pvec_len].vec_len = nw -> channels [i].bytes;
nw -> pvec_len++;
}
// Contruct another scattered data array to be used for data decimation
// in the net_writer_c::consumer()
// `dec_vec [].vec_idx' is the offset in the net-writer circ buffer data block
if (!nw -> dec_vec_len) { // Start the vector
nw -> dec_vec [0].vec_idx = 0;
nw -> dec_vec [0].vec_len = nw -> channels [i].bytes;
nw -> dec_vec [0].vec_rate = nw -> channels [i].req_rate;
nw -> dec_vec [0].vec_bps = nw -> channels [i].bps;
nw -> dec_vec_len++;
} else if (nw -> channels [i -1].req_rate == nw -> channels [i].req_rate
&& nw -> channels [i -1].bytes == nw -> channels [i].bytes
&& nw -> channels [i -1].bps == nw -> channels [i].bps) {
// Increase data element length for contiguous channels with the same rate
// and the same number bytes per sample.
// We should be able to treat these channels as one channel with the
// increased data rate.
nw -> dec_vec [nw -> dec_vec_len -1].vec_len += nw -> channels [i].bytes;
nw -> dec_vec [nw -> dec_vec_len -1].vec_rate += nw -> channels [i].req_rate;
} else {
// Start new vector if rate or BPS changes
nw -> dec_vec [nw -> dec_vec_len].vec_idx
= nw -> dec_vec [nw -> dec_vec_len -1].vec_idx
+ nw -> dec_vec [nw -> dec_vec_len -1].vec_len;
nw -> dec_vec [nw -> dec_vec_len].vec_len = nw -> channels [i].bytes;
nw -> dec_vec [nw -> dec_vec_len].vec_rate = nw -> channels [i].req_rate;
nw -> dec_vec [nw -> dec_vec_len].vec_bps = nw -> channels [i].bps;
nw -> dec_vec_len++;
}
}
}
nw -> decimation_requested = !nw -> broadcast;
printf("decimation flag=%d\n", nw -> decimation_requested);
nw -> block_size += nw -> num_channels * sizeof(int);
if (fast_net_writer) {
if (!nw -> broadcast) {
unsigned long status_ptr = sizeof(int) + daqd.block_size
- 17 * sizeof(int) * daqd.num_channels;
for (int i = 0; i < nw -> num_channels; i++) {
for (int j = 0; j < 16; j++) {
nw -> pvec16th [j][nw -> pvec_len].vec_idx
= j*sizeof(int) + status_ptr + 17 * sizeof(int) * nw -> channels [i].seq_num;
nw -> pvec16th [j][nw -> pvec_len].vec_len = sizeof(int);
}
nw -> pvec_len++;
}
}
} else {
// append one element per channel to save the status word
//
unsigned long status_ptr = daqd.block_size - 17 * sizeof(int) * daqd.num_channels;
for (int i = 0; i < nw -> num_channels; i++) {
nw -> pvec [nw -> pvec_len].vec_idx = status_ptr + 17 * sizeof(int) * nw -> channels [i].seq_num;
nw -> pvec [nw -> pvec_len].vec_len = sizeof(int);
nw -> pvec_len++;
}
}
if (!fast_net_writer) {
DEBUG1(cerr << "pvec->vec_idx\tpvec->vec_len" << endl);
DEBUG1(for(int j=0; j < nw -> pvec_len; j++) cerr << nw -> pvec [j].vec_idx << '\t' << nw -> pvec [j].vec_len << endl);
}
DEBUG1(cerr << "dec_vec->vec_idx\tdec_vec->vec_len\tdec_vec->vec_rate\tdec_vec->vec_bps" << endl);
DEBUG1(for(int k=0; k < nw -> dec_vec_len; k++) cerr << nw -> dec_vec [k].vec_idx << '\t' << nw -> dec_vec [k].vec_len << '\t' << nw -> dec_vec [k].vec_rate << '\t' << nw -> dec_vec [k].vec_bps << endl);
} // not name writer
circ_buffer *this_cb;
if (nw -> writer_type == net_writer_c::name_writer)
this_cb = daqd.fsd.cb;
else {
this_cb = daqd.b1;
nw -> need_send_reconfig_block = true;
}
if (!(errc = nw -> start_net_writer (yyout, ((my_lexer *)lexer) -> ofd, no_data_connection,
this_cb, &daqd.fsd, $4, $5, 0))) {
if (! no_data_connection) {
if (((my_lexer *)lexer) -> strict) {
*yyout << S_DAQD_OK << flush;
// send writer ID
*yyout << setfill ('0') << setw (sizeof (unsigned long) * 2) << hex
<< (unsigned long) nw << dec << flush;
} else {
*yyout << "started" << endl;
*yyout << "writer id: " << (unsigned long) nw << endl;
}
}
} else { // a problem
nw -> ~net_writer_c ();
free ((void *) nw);
if (((my_lexer *)lexer) -> strict)
*yyout << setw(4) << setfill ('0') << hex << errc << dec << flush;
else
*yyout << "error " << errc << endl;
}
}
start_writer_bailout:
if ($3)
free ($3);
comm_impl::start_write_impl(lexer, (int)$2, $3, $4, $5, $6, (int)7, (int)$9);
}
| KILL NET_WRITER INTNUM {
......
This diff is collapsed.
......@@ -12,10 +12,18 @@
namespace comm_impl
{
extern void start_write_impl( void* lexer,
int writerType_2_,
char* optionalAddress_3_,
int optionalStart_4_,
int optionalStop_5_,
char* optionalBroadcast_6_,
int decimateOption_7_,
int channelNames_9_ );
extern void configure_channels_body_begin_end( );
}
} // namespace comm_impl
#define DAQD_TRUNK_COMM_IMPL_HH
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment