#ifndef NET_WRITER_HH #define NET_WRITER_HH #include "daqd.hh" /// Request processing. New net writer is created for each /// new data request. class net_writer_c : public s_link { private: pthread_mutex_t bm; void lock( void ) { pthread_mutex_lock( &bm ); } void unlock( void ) { pthread_mutex_unlock( &bm ); } class locker; friend class net_writer_c::locker; class locker { net_writer_c* dp; public: locker( net_writer_c* objp ) { ( dp = objp )->lock( ); } ~locker( ) { dp->unlock( ); } }; // This lock is used in the transient producer and consumer // to sync the access to the circular buffer with its destruction pthread_mutex_t tl; class transiency_locker; void transiency_lock( void ) { pthread_mutex_lock( &tl ); } void transiency_unlock( void ) { pthread_mutex_unlock( &tl ); } friend class net_writer_c::transiency_locker; class transiency_locker { net_writer_c* dp; public: transiency_locker( net_writer_c* objp ) { ( dp = objp )->transiency_lock( ); } ~transiency_locker( ) { dp->transiency_unlock( ); } }; public: typedef struct { bool inUse; char* data; } radio_buffer; int broadcast; int mcast_port; std::string mcast_interface; diag::frameSend radio; static const int radio_buf_len = 64 * 1024 * 1024; static const int radio_buf_num = 20; radio_buffer radio_bufs[ radio_buf_num ]; int* first_adc_ptr; static const int default_mcast_port = diag::frameXmitPort + 2; int clear_testpoints; // Scattered data vector type for data decimation struct dec_put_vec { unsigned long vec_idx; unsigned long vec_len; unsigned int vec_rate; unsigned int vec_bps; }; enum { slow_writer = 0, frame_writer = 1, fast_writer = 2, name_writer = 3 } writer_type; net_writer_c( int nid ) : num_channels( 0 ), shutdown_now( 0 ), buffptr( 0 ), block_size( 0 ), transmission_block_size( 0 ), transient( 0 ), pvec_len( 0 ), dec_vec_len( 0 ), writer_type( slow_writer ), seq_num( 0 ), channels( 0 ), pvec( 0 ), dec_vec( 0 ), s_link( nid ), broadcast( 0 ), mcast_interface( "" ), mcast_port( default_mcast_port ), radio( radio_buf_num - 1 ), clear_testpoints( 0 ), decimation_requested( false ), need_send_reconfig_block( false ), no_averaging( false ) { offline.seconds_per_sample = 1; pthread_mutex_init( &bm, NULL ); pthread_mutex_init( &tl, NULL ); for ( int i = 0; i < radio_buf_num; i++ ) { radio_bufs[ i ].data = 0; radio_bufs[ i ].inUse = false; } for ( int i = 0; i < 16; i++ ) { pvec16th[ i ] = 0; } }; ~net_writer_c( ) { DEBUG1( cerr << "net writer deleted\n" << endl ); pthread_mutex_destroy( &bm ); pthread_mutex_destroy( &tl ); free_vars( ); } /*! * @brief set the multicast/broadcast interface and port. * @param mcast_interface_and_port The interface (and optionally port) * to use as the multicast/broadcast. It is formatted as "interaface:port" * @note if a port is not specified it is defaulted to * default_mcast_port. */ void set_mcast_interface( const char* mcast_interface_and_port ); // pthread_mutex_t lock; int cnum; /* Consumer number */ void free_vars( ) { for ( int i = 0; i < 16; i++ ) { if ( pvec16th[ i ] ) free( pvec16th[ i ] ); pvec16th[ i ] = 0; } if ( channels ) free( channels ); channels = 0; if ( pvec ) free( pvec ); pvec = 0; if ( dec_vec ) free( dec_vec ); dec_vec = 0; } bool vars_fine( ) { bool r = true; for ( int i = 0; i < 16; i++ ) { r &= (bool)( pvec16th[ i ] ); } r &= (bool)channels; r &= (bool)pvec; return r & (bool)dec_vec; } bool alloc_vars( unsigned int maxchan ) { channels = (long_channel_t*)malloc( sizeof( long_channel_t ) * maxchan ); pvec = (struct put_vec*)malloc( sizeof( struct put_vec ) * maxchan * 2 ); dec_vec = (struct dec_put_vec*)malloc( sizeof( struct dec_put_vec ) * maxchan ); for ( int i = 0; i < 16; i++ ) pvec16th[ i ] = (struct put_vec*)malloc( sizeof( struct put_vec ) * maxchan * 2 ); if ( !vars_fine( ) ) { free_vars( ); return false; } else return true; } int num_channels; ///< Size of "channels" array below long_channel_t* channels; ///< Data channels to send int pvec_len; ///< Number of elements in `pvec'/`pvec16th' // :IMPORTANT: max chan num times two struct put_vec* pvec; ///< Put vector prepared for `put_nowait_scattered()' ///< operation in producer // :IMPORTANT: max chan num times two struct put_vec* pvec16th[ 16 ]; ///< Put vector prepared for `put_nowait_scattered()' ///< operation in the fast producer int dec_vec_len; ///< Number of elements in `dec_vec' struct dec_put_vec* dec_vec; ///< Scattered vector used for data decimation int block_size; ///< net-writer data block size (sum of the sizes of the ///< configured channels) // size of the network transmission block (could be less than `block_size' // if there are decimated channels int transmission_block_size; bool decimation_requested; short averaging( short* v, int num ) { assert( num > 0 && num < SHRT_MAX ); long res = 0; for ( int i = 0; i < num; i++ ) res += v[ i ]; return (short)( res / num ); } int averaging( int* v, int num ) { assert( num > 0 && num < SHRT_MAX ); long long res = 0; for ( int i = 0; i < num; i++ ) res += v[ i ]; return (int)( res / num ); } float averaging( float* v, int num ) { assert( num > 0 && num < SHRT_MAX ); double res = 0; for ( int i = 0; i < num; i++ ) res += v[ i ]; return (float)( res / num ); } double averaging( double* v, int num ) { assert( num > 0 && num < SHRT_MAX ); long double res = 0; for ( int i = 0; i < num; i++ ) res += v[ i ]; return (double)( res / num ); } circ_buffer* buffptr; circ_buffer* source_buffptr; ///< Data feed filesys_c* fsd; ///< Time to filename map to get data from files pthread_t producer_tid; pthread_t consumer_tid; /* * Socket address, internet style. struct sockaddr_in { short sin_family; u_short sin_port; struct in_addr sin_addr; char sin_zero[8]; }; */ struct sockaddr_in srvr_addr; ///< Data connection address int fileno; ///< Data connection socket file descriptor int connect_srvr_addr( int ); ///< Connect to `srvr_addr' and set `fileno' socket int set_send_buf_size( int, int ); int disconnect_srvr_addr( ) { int res = 0; if ( fileno != -1 ) { system_log( 1, "connection closed on fd=%d", fileno ); res = close( fileno ); fileno = -1; } return res; } int send_reconfig_block( int* status_ptr, bool units_change, bool status_change ); int start_net_writer( ostream*, int, int, circ_buffer*, filesys_c*, time_t, time_t, int ); int kill_net_writer( ); void* producer( ); static void* producer_static( void* a ) { return ( (net_writer_c*)a )->producer( ); }; void* consumer( ); static void* consumer_static( void* a ) { { locker mon( (net_writer_c*)a ); } if ( ( (net_writer_c*)a ) ->send_client_header( DAQD_LENGTH_UNKNOWN ) ) // Send realtime data stream // indicator return NULL; return ( (net_writer_c*)a )->consumer( ); }; void* transient_producer( ); static void* transient_producer_static( void* a ) { return ( (net_writer_c*)a )->transient_producer( ); }; void* transient_consumer( ); static void* transient_consumer_static( void* a ) { { locker mon( (net_writer_c*)a ); } return ( (net_writer_c*)a )->transient_consumer( ); }; static void* frame_consumer_static( void* a ) { { locker mon( (net_writer_c*)a ); } return ( (net_writer_c*)a )->send_frames( ); }; void destroy( void ) { this->~net_writer_c( ); free( (void*)this ); }; int transient; ///< Set if this producer doesn't send data online /* Data used by transient net-writer. This sort of net-writer sends off-line data to the client. This data is for certain period. Period is set by `start net-writer <gps> <delta> ... ' command or `start net-writer <delta>' command. In the following structure `gps' and `delta' store parameters of `start net-writer' command. `start_net_writer' starts network writer producer thread which takes some data out of the main circular buffer and puts it in the network writer circular buffer. This data is for some perioad of time. This period is indicated by `bstart' and `blast'. Namely, `bstart' is the timestamp of the first available data block and `blast' is the timestamp of the last. The consumer thread will use this data to find out where to get the data from: some of the data will be coming from the frame files and some from the network writer circular buffer. */ struct { time_t gps; time_t delta; time_t bstart; time_t blast; time_t seconds_per_sample; // set to the period of one minute trend sample // for the minute trend; set to one otherwise int no_time_check; // set when time check should be bypassed, i.e. // unknown channel names present in the request } offline; int seq_num; bool need_send_reconfig_block; /// Is set by 'downsample' parameter to 'start net-writer' command bool no_averaging; /// Send some data to the client int send_to_client( char* data, unsigned long len, unsigned long gps = 0, int period = 0 ); /// Send ACK to the client + net-writer ID int send_positive_response( ) { int res; char sbuf[ 9 ]; // :TODO: must fix the net-writer ID sprintf( sbuf, "%08x", (unsigned int)0 ); if ( !( res = send_to_client( S_DAQD_OK, 4 ) ) ) return send_to_client( sbuf, 8 ); return res; } /// Send to the client transmission header (indicator of how many data /// blocks wil follow) int send_client_header( unsigned int blocks ) { // blocks = htonl (blocks); blocks = htonl( !!blocks ); // 1 for off-line; 0 for online return send_to_client( (char*)&blocks, sizeof( unsigned int ) ); } /// The time period is given by `offline.gps' and `offline.delta' int send_files( void ); int send_raw_files( void ); void* send_frames( void ); /// Send data transmission trailer int send_trailer( ) { unsigned int header[ 5 ]; // size of the transmission block (zero) minus size of this length word header[ 0 ] = htonl( 4 * sizeof( unsigned int ) ); header[ 1 ] = htonl( 0 ); header[ 2 ] = htonl( 0 ); header[ 3 ] = htonl( 0 ); header[ 4 ] = htonl( 0 ); if ( send_to_client( (char*)&header, 5 * sizeof( unsigned int ) ) ) return -1; DEBUG1( cerr << "net writer trailer sent" << endl ); return 0; } /// Determine IP address given socket file descriptor /// Returns IP on success (similar to inet_addr(3N)), `-1' on failure static int ip_fd( int fd ) { struct sockaddr_in paddr; socklen_t paddr_len = sizeof( paddr ); if ( getpeername( fd, (struct sockaddr*)&paddr, &paddr_len ) < 0 ) return -1; return (int)paddr.sin_addr.s_addr; } static struct in_addr ip_fd_in_addr( int fd ) { struct in_addr ia; ia.s_addr = ip_fd( fd ); return ia; } static char* ip_fd_ntoa( int fd, char* buf ) { char* istr = net_writer_c::inet_ntoa( ip_fd_in_addr( fd ), buf ); // istr = ((int) istr) != 0? istr: "unknown"; return istr; } /// This is here to substitute unrealiable system's inet_ntoa() function /// There were memory leaks detected inside of it by Purify static char* inet_ntoa( struct in_addr in, char* buf ) { unsigned char bt[ 4 ]; memcpy( bt, (const void*)&in.s_addr, 4 ); sprintf( buf, "%u.%u.%u.%u", bt[ 0 ], bt[ 1 ], bt[ 2 ], bt[ 3 ] ); return buf; } /// Match `str' against the regular expression to see if /// this is a valid IP address. If not try to resolv `str' into the IP /// address. static int get_inet_addr( char* str ) { // See if the string is IP address if ( !daqd_c::is_valid_ip_address( str ) ) { int error; struct hostent* hp; int ret; struct hostent hent; char buf[ 2048 ]; // Try to resolve name into IP address if ( gethostbyname_r( str, &hent, buf, 2048, &hp, &error ) ) return -1; (void)memcpy( &ret, *hent.h_addr_list, sizeof( ret ) ); return ret; } else return inet_addr( str ); } /// Get IP address from the string in format `127.0.0.1:9090' /// Returns IP (value of inet_addr(3N)) on success, `-1' if failed. /// Fails if there is no IP address in the `str' or if `inet_addr()' failed. /// Data in the string `str' must not be in the static storage. static int ip_str( char* str ) { char* nc = str - 1; // `:' separates IP address from the port number int res; if ( nc = strchr( str, ':' ) ) { *nc = 0; // Substitute `:' on `\000' for an instant, shouldn't be a // problem res = get_inet_addr( str ); *nc = ':'; } else // Consider whole string an IP address if there is no `:' in it res = get_inet_addr( str ); return res; } /// Get port number from the string in the format `127.0.0.1:9090' static int port_str( char* str ) { char* nc; if ( nc = strchr( str, ':' ) ) return htons( atoi( nc + 1 ) ); return htons( atoi( str ) ); } private: int shutdown_now; void shutdown_net_writer( ); void shutdown_buffer( ) { circ_buffer* oldb; oldb = buffptr; buffptr = 0; oldb->~circ_buffer( ); free( (void*)oldb ); shutdown_now = 0; } }; // class net_writer #endif