Commit 5e80c1f7 authored by Jonathan Hanks's avatar Jonathan Hanks

Merge branch 'remove_zmq' into 'master'

Remove zmq

See merge request cds/advligorts!66
parents ea7f82fb d2750ac5
......@@ -23,7 +23,6 @@ FIND_PACKAGE(EPICSBase REQUIRED)
FIND_PACKAGE(FrameCPP REQUIRED)
FIND_PACKAGE(MX)
FIND_PACKAGE(OpenMX)
FIND_PACKAGE(ZMQ4)
FIND_PACKAGE(Dolphin)
FIND_PACKAGE(NDS2Client)
FIND_PACKAGE(Boost COMPONENTS system)
......
......@@ -20,7 +20,7 @@ via pkg-config
5. make -j 8
* Note on Boost.
FrameCPP 2.6+ and the standalone_edc uses the boost libraries. Boost is a hard requirement for the daqd. To install boost see (https://www.boost.org/doc/libs/1_69_0/more/getting_started/unix-variants.html)
FrameCPP 2.6+ and the standalone_edc uses the boost libraries. Boost is also hard requirement for the daqd, the edcu, and the run number server. To install boost see (https://www.boost.org/doc/libs/1_69_0/more/getting_started/unix-variants.html)
You may need to pass a parameter to cmake -DBOOST_ROOT=<boost install prefix> to help cmake find boost.
......@@ -37,11 +37,15 @@ awgtpman
daqd (if the compiler is new enough)
nds
dataviewer (in its many pieces)
mx_stream
omx_xmit
omx_recv
dix_xmit
dix_recv
cps_xmit
cps_recv
local_dc
run_number_server
the zmq_stream components
zmq_fe
zmq_rcv_ix_xmit_delay
standalone_edc
If you need to install a copy of cmake you can retrieve the source from kitware.
......@@ -65,40 +69,43 @@ You can now use cmake as '/opt/cmake-3.7.2/bin/cmake'
Building on Debian
The cmake development is primarily done on Debian (both 8 & 9). Though with an updated gentoo system it works there.
The cmake development is primarily done on Debian 10.
The LSCSoft and CDS debian repositories are used to provide some of the software. See the following resources for information in setting up these repositories:
The CDS debian repositories are used to provide some of the software. See the following resources for information in setting up these repositories:
https://wiki.ligo.org/Computing/DASWG/DebianJessie
http://apt.ligo-wa.caltech.edu/debian/README.txt
On Debian the following packages are used:
build-essential
cmake
bison
flex
libzmq3-dev
pkg-config
ldas-tools-al-dev
ldas-tools-framecpp-dev
epics-dev
libbz2-dev
libmotif-dev
libxpm-dev
libxt-dev
grace
pcaspy
libboost-all-dev (Required for FrameCPP >= 2.6.0)
You will also need to install
bison,
cmake,
debhelper (>= 9),
dkms,
dolphin-sisci-ix-devel,
epics-dev,
flex,
grace,
ldas-tools-al-dev,
ldas-tools-framecpp-dev,
libboost-all-dev,
libbz2-dev,
libc-dev-bin,
libcds-pubsub-dev,
libfl-dev,
libmotif-dev,
libnds2-client-dev,
libtool,
libxpm-dev,
libxt-dev,
libzmq3-dev,
pkg-config,
rapidjson-dev,
You will may also wish to instll
Open-MX
Dolphin
Open-MX is available in the CDS jessie-restricted repository (Debian 8) or may be built by hand.
On Debian 8 we have back ported the main zmq package from Debian 9 so that 4.2.1 is available on Debian 8, 9, and
the gentoo systems.
Open-MX is available in the CDS jessie-restricted repository (Debian 8) or may be built by hand. For Debian 10 it must be built by hand.
FE -> DAQD Transport
......@@ -110,10 +117,10 @@ local_dc and omx_xmit on the FE computers
omx_recv and dix_xmit on the data concentrator
dix_recv on the daqd machines
Using Zmq and IX Dolphin
Using CDS Pub/Sub and IX Dolphin
local_dc and zmq_xmit on the FE computers
zmq_recv and dix_xmit on the data concentrator
local_dc and cps_xmit on the FE computers
cps_recv and dix_xmit on the data concentrator
dix_recv on the daqd machines
......@@ -148,12 +155,12 @@ dix_recv -b local_dc -m 100 -g 0
-m the size of the buffer in MB
-g the IX memory window/group number to transfer data over
zmq_xmit -b local_dc -m 100 -e eth0
cps_xmit -b local_dc -m 100 -p "tcp://10.11.0.7:9000"
-b the name of the buffer to read data from
-m the size of the buffer in MB
-e the interface to publish data on
-p the publish method, in this example tcp unicast from 10.11.0.7:9000
zmq_recv -b local_dc -m 100 -s "x1susex x1lsc0 x1asc0"
cps_recv -b local_dc -m 100 -s "tcp://10.11.0.7:9000 tcp://10.11.0.11:9000"
-b the name of the buffer to write data to
-m the size of the buffer in MB
-s the systems to retrieve data from
......
if (cds_find_zmq4_included)
return()
endif(cds_find_zmq4_included)
set(cds_find_zmq4_included TRUE)
FIND_PACKAGE(PkgConfig)
pkg_check_modules(LibZMQ libzmq>=4.0.0)
if (LibZMQ_FOUND)
set (_fcpp_lib_list "")
find_library(libzmq4_LIBARY_PATH name zmq
PATHS ${LibZMQ_LIBRARY_DIRS}
)
add_library(_zmq4 SHARED IMPORTED)
set_target_properties(_zmq4 PROPERTIES
IMPORTED_LOCATION ${libzmq4_LIBARY_PATH})
add_library(_zmq4_intl INTERFACE)
target_include_directories(_zmq4_intl INTERFACE ${LibZMQ_INCLUDE_DIRS})
target_link_libraries(_zmq4_intl INTERFACE _zmq4)
find_file(_zmq4_hpp zmq.hpp
PATHS ${libZMQ_INCLUDE_DIRS})
if (NOT ${_zmq4_hpp})
target_include_directories(_zmq4_intl INTERFACE ${PROJECT_SOURCE_DIR}/src/include/thirdparty/zmq)
message("Adding ${PROJECT_SOURCE_DIR}/src/include/thirdparty/zmq to the include list for zmq4")
endif(NOT ${_zmq4_hpp})
# Give the interface library a nice name for exporting
add_library(zmq4::zmq ALIAS _zmq4_intl)
endif(LibZMQ_FOUND)
......@@ -12,12 +12,6 @@ add_subdirectory(local_dc)
add_subdirectory(fe_stream_test)
add_subdirectory(pub_sub_stream)
if (${LibZMQ_FOUND})
add_subdirectory(zmq_stream)
else (${LibZMQ_FOUND})
MESSAGE("Skipping zmq_stream")
endif (${LibZMQ_FOUND})
if (${DOLPHIN_FOUND})
add_subdirectory(ix_stream)
else (${DOLPHIN_FOUND})
......
......@@ -121,7 +121,6 @@ target_include_directories(daqd PUBLIC
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../include
${CMAKE_CURRENT_SOURCE_DIR}/../zmq_stream
${RPC_INCLUDE_DIRS}
)
target_link_libraries(daqd PUBLIC
......
......@@ -9,31 +9,3 @@ install(TARGETS dix_xmit DESTINATION bin)
add_executable(dix_recv dix_recv.c)
target_link_libraries(dix_recv PUBLIC driver::shmem dolphin::sisci)
install(TARGETS dix_recv DESTINATION bin)
#if (${MX_FOUND})
#
#add_executable(mx_rcv mx_rcv.cc dolphin_common.cc)
#target_compile_options(mx_rcv PRIVATE -O2)
#target_link_libraries(mx_rcv PUBLIC
# driver::shmem
# dolphin::sisci
# zmq::simple_pv
# mx::myriexpress
# ${CMAKE_THREAD_LIBS_INIT}
#)
#
#add_executable(mx_rcv_simple mx_rcv_simple.cc dolphin_common.cc)
#target_compile_options(mx_rcv_simple PRIVATE -O2)
#target_link_libraries(mx_rcv_simple PUBLIC
# driver::shmem
# dolphin::sisci
# zmq::simple_pv
# mx::myriexpress
# rt
# ${CMAKE_THREAD_LIBS_INIT}
#)
#
#
#endif (${MX_FOUND})
......@@ -24,7 +24,6 @@
#include <time.h>
#include "../include/daqmap.h"
#include "../include/daq_core.h"
#include "../zmq_stream/dc_utils.h"
#include "args.h"
// #include "testlib.h"
......
......@@ -30,7 +30,6 @@
#include <time.h>
#include "../include/daqmap.h"
#include "../include/daq_core.h"
#include "../zmq_stream/dc_utils.h"
// #include "testlib.h"
......
project(run_number)
cmake_minimum_required(VERSION 3.0.0)
set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../../config/cmake")
INCLUDE(CheckCXXCompilerFlag)
include(FindThreads)
find_package(ZMQ4)
find_path(CATCH_PATH catch.hpp
HINTS /usr/include /usr/local/include ${PROJECT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
CHECK_CXX_COMPILER_FLAG(-std=c++0x HAS_CXX_0X)
CHECK_CXX_COMPILER_FLAG(-std=c++11 HAS_CXX_11)
if (${HAS_CXX_11})
set(_RN_CHECK_FLAGS "-std=c++11")
elseif(${HAS_CXX_0X})
set(_RN_CHECK_FLAGS "-std=c++0x")
else(${HAS_CXX_11})
set(_RN_CHECK_FLAGS "")
endif (${HAS_CXX_11})
if (${LibZMQ_FOUND})
add_executable(run_number_server run_number_main.cc run_number.cc run_number.hh)
target_link_libraries(run_number_server PUBLIC zmq4::zmq)
target_compile_options(run_number_server PRIVATE ${_RN_CHECK_FLAGS})
add_library(run_number_client STATIC run_number_client.cc run_number_client.hh)
target_link_libraries(run_number_client PUBLIC zmq4::zmq)
target_include_directories(run_number_client PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
add_library(run_number::run_number ALIAS run_number_client)
add_executable(test_run_number tests/test_main.cc tests/test_run_number.cc tests/test_run_number_structs.cc run_number.cc run_number.hh)
target_include_directories(test_run_number PRIVATE ${CATCH_PATH})
target_compile_options(test_run_number PRIVATE ${_RN_CHECK_FLAGS})
add_executable(test_run_number_client tests/test_client_main.cc)
target_link_libraries(test_run_number_client PUBLIC run_number::run_number)
add_executable(run_number_test_client run_number_test_client.cc)
target_link_libraries(run_number_test_client PUBLIC run_number_client)
add_test(_rn_basic_run_number_tests_ test_run_number)
install(TARGETS run_number_server
DESTINATION bin
)
else (${LibZMQ_FOUND})
MESSAGE("ZeroMQ libraries not found, building the null run number client")
add_library(null_run_number_client STATIC null_run_number_client.cc run_number_client.hh)
target_include_directories(null_run_number_client PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
add_library(run_number::run_number ALIAS null_run_number_client)
endif (${LibZMQ_FOUND})
\ No newline at end of file
configure_file(tests/test_run_number_server.sh.in "${CMAKE_CURRENT_BINARY_DIR}/test_run_number_server.sh" @ONLY)
add_executable(run_number_server run_number_main.cc run_number.cc run_number.hh)
target_link_libraries(run_number_server PRIVATE
args
${Boost_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT})
target_include_directories(run_number_server PUBLIC ${Boost_INCLUDE_DIRS})
target_requires_cpp11(run_number_server PUBLIC)
add_library(run_number_client STATIC run_number_client.cc run_number_client.hh)
target_link_libraries(run_number_client PUBLIC ${Boost_LIBRARIES})
target_include_directories(run_number_client PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_include_directories(run_number_client PRIVATE ${Boost_INCLUDE_DIRS})
target_requires_cpp11(run_number_client PRIVATE)
add_library(run_number::run_number ALIAS run_number_client)
add_executable(test_run_number
tests/test_main.cc
tests/test_run_number.cc
tests/test_run_number_structs.cc
tests/test_run_number_internals.cc
run_number.cc
run_number.hh)
target_include_directories(test_run_number PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(test_run_number PRIVATE
${Boost_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT})
add_executable(test_run_number_client tests/test_client_main.cc)
target_link_libraries(test_run_number_client
PUBLIC run_number::run_number
${CMAKE_THREAD_LIBS_INIT})
add_executable(run_number_test_client run_number_test_client.cc)
target_link_libraries(run_number_test_client PUBLIC
args
run_number_client
${CMAKE_THREAD_LIBS_INIT})
add_test(_rn_basic_run_number_tests_ test_run_number)
add_test(NAME test_run_number_server
COMMAND /bin/bash ./test_run_number_server.sh
WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
install(TARGETS run_number_server
DESTINATION bin
)
#include <zmq.hpp>
#include "run_number_structs.h"
#include "run_number_client.hh"
#include <chrono>
#include "run_number_internal.hh"
namespace daqd_run_number
{
void
run_socket_timeout( tcp::socket& s, const std::chrono::seconds& timeout )
{
io_context_t& context = s.get_executor( ).context( );
context.restart( );
context.run_for( timeout );
if ( !context.stopped( ) )
{
s.close( );
context.run( );
}
else
{
context.restart( );
}
}
void
read_with_timeout( tcp::socket& s,
boost::asio::mutable_buffer dest,
std::chrono::seconds timeout )
{
bool read_done{ false };
io_context_t& context = s.get_executor( ).context( );
boost::asio::async_read(
s,
dest,
[&read_done, &context]( const boost::system::error_code& ec,
size_t bytes_read ) {
context.stop( );
if ( !ec )
{
read_done = true;
}
} );
run_socket_timeout( s, timeout );
if ( !read_done )
{
throw std::runtime_error( "Timeout or network error" );
}
}
void
write_with_timeout( tcp::socket& s,
boost::asio::const_buffer src,
std::chrono::seconds timeout )
{
bool write_done{ false };
io_context_t& context = s.get_executor( ).context( );
boost::asio::async_write(
s,
src,
[&write_done, &context]( const boost::system::error_code& ec,
size_t bytes_written ) {
context.stop( );
if ( !ec )
{
write_done = true;
}
} );
run_socket_timeout( s, timeout );
if ( !write_done )
{
throw std::runtime_error( "Timeout or network error" );
}
}
int
get_run_number( const std::string& target, const std::string& hash )
{
const int timeout = 10 * 1000;
const std::chrono::seconds timeout( 10 );
try
{
......@@ -16,42 +89,29 @@ namespace daqd_run_number
if ( hash.size( ) > sizeof( req.hash ) )
return 0;
auto target_address =
daqd_run_number::parse_connection_string( target );
io_context_t context;
tcp::resolver resolver( context );
tcp::resolver::results_type endpoints =
resolver.resolve( target_address.address, target_address.port );
tcp::socket socket( context );
boost::asio::connect( socket, endpoints );
req.version = 1;
req.hash_size = static_cast< short >( hash.size( ) );
memset( &req.hash[ 0 ], 0, sizeof( req.hash ) );
memcpy( &req.hash[ 0 ], hash.data( ), hash.size( ) );
zmq::context_t context( 1 );
zmq::socket_t requestor( context, ZMQ_REQ );
zmq::message_t request( sizeof( req ) );
memcpy( request.data( ), &req, sizeof( req ) );
int dummy = timeout;
#ifdef ZMQ_CONNECT_TIMEOUT
requestor.setsockopt(
ZMQ_CONNECT_TIMEOUT, &dummy, sizeof( dummy ) );
#endif
requestor.setsockopt( ZMQ_SNDTIMEO, &dummy, sizeof( dummy ) );
requestor.setsockopt( ZMQ_RCVTIMEO, &dummy, sizeof( dummy ) );
dummy = 0;
requestor.setsockopt( ZMQ_LINGER, &dummy, sizeof( dummy ) );
write_with_timeout( socket, to_const_buffer( req ), timeout );
requestor.connect( target.c_str( ) );
requestor.send( request );
daqd_run_number_resp_v1_t resp;
read_with_timeout( socket, to_buffer( resp ), timeout );
zmq::message_t resp_msg;
if ( !requestor.recv( &resp_msg ) )
return 0;
if ( resp_msg.size( ) != sizeof( daqd_run_number_resp_v1_t ) )
return 0;
daqd_run_number_resp_v1_t* resp =
reinterpret_cast< daqd_run_number_resp_v1_t* >(
resp_msg.data( ) );
if ( resp->version != 1 )
if ( resp.version != 1 )
return 0;
return resp->number;
return resp.number;
}
catch ( ... )
{
......
//
// Created by jonathan.hanks on 3/26/20.
//
#ifndef DAQD_TRUNK_RUN_NUMBER_INTERNAL_HH
#define DAQD_TRUNK_RUN_NUMBER_INTERNAL_HH
#include <boost/asio.hpp>
#include <boost/utility/string_view.hpp>
#include "run_number_structs.h"
#if BOOST_ASIO_VERSION < 101200
using io_context_t = boost::asio::io_service;
inline io_context_t&
get_context( boost::asio::ip::tcp::acceptor& acceptor )
{
return acceptor.get_io_service( );
}
inline boost::asio::ip::address
make_address( const char* str )
{
return boost::asio::ip::address::from_string( str );
}
#else
using io_context_t = boost::asio::io_context;
inline io_context_t&
get_context( boost::asio::ip::tcp::acceptor& acceptor )
{
return acceptor.get_executor( ).context( );
}
inline boost::asio::ip::address
make_address( const char* str )
{
return boost::asio::ip::make_address( str );
}
#endif
namespace daqd_run_number
{
using boost::asio::ip::tcp;
struct endpoint_address
{
std::string address{ "0.0.0.0" };
std::string port{ "5556" };
};
/*!
* @brief parse a connection string into an address/port pair.
* @param conn_str The address
* @return The parsed address
* @details The full form is address:port, the port is defaulted if not
* specified. It can also accept zmq style tcp://address:port.
* If "*" is passed as the address it is turned to "0.0.0.0".
* @note the default port is 5556
*/
inline endpoint_address
parse_connection_string( const std::string& conn_str )
{
static const std::string tcp_prefix = "tcp://";
endpoint_address results;
boost::string_view conn = conn_str;
if ( conn.starts_with( tcp_prefix ) )
{
conn = conn.substr( tcp_prefix.size( ) );
}
auto index = conn.find( ':' );
if ( index != boost::string_view::npos )
{
auto port = conn.substr( index + 1 );
if ( !port.empty( ) )
{
results.port = std::string( port.begin( ), port.end( ) );
}
}
auto address = conn.substr( 0, index );
results.address = std::string( address.begin( ), address.end( ) );
if ( results.address == "*" )
{
results.address = "0.0.0.0";
}
return results;
}
inline boost::asio::mutable_buffer
to_buffer( daqd_run_number_req_v1_t& req )
{
return boost::asio::buffer( &req, sizeof( req ) );
}
inline boost::asio::mutable_buffer
to_buffer( daqd_run_number_resp_v1_t& resp )
{
return boost::asio::buffer( &resp, sizeof( resp ) );
}
inline boost::asio::const_buffer
to_const_buffer( daqd_run_number_req_v1_t& req )
{
return boost::asio::buffer( &req, sizeof( req ) );
}
inline boost::asio::const_buffer
to_const_buffer( daqd_run_number_resp_v1_t& resp )
{
return boost::asio::buffer( &resp, sizeof( resp ) );
}
} // namespace daqd_run_number
#endif // DAQD_TRUNK_RUN_NUMBER_INTERNAL_HH
#include <iostream>
#include <string>
#include <vector>
#include <zmq.hpp>
#include <fstream>
#include "args.h"
#include "run_number.hh"
#include "run_number_structs.h"
#include "run_number_server.hh"
static const std::string default_db = "run-number.db";
static const std::string default_endpoint = "tcp://*:5556";
static const char* default_db = "run-number.db";
static const char* default_endpoint = "0.0.0.0:5556";
struct config
{
std::string db_path;
std::string endpoint;
bool verbose;
bool test_crash;
config( )
: db_path( default_db ), endpoint( default_endpoint ), verbose( false ),
test_crash( false )
{
}
config( const config& other )
: db_path( other.db_path ), endpoint( other.endpoint ),
verbose( other.verbose ), test_crash( other.test_crash )
{
}
config&
operator=( const config& other )
: db_path( default_db ), endpoint( default_endpoint ), verbose( false )
{
db_path = other.db_path;
endpoint = other.endpoint;
verbose = other.verbose;
test_crash = other.test_crash;
return *this;
}
config( const config& other ) = default;
config& operator=( const config& other ) = default;
};
void
send_zero_response( zmq::socket_t& s )
{
daqd_run_number_resp_v1_t resp;
bzero( &resp, sizeof( resp ) );
resp.version = 1;
}
bool
parse_args( int argc, char* argv[], config& cfg )
{
std::string prog_name( ( argc >= 1 ? argv[ 0 ] : "unknown" ) );
bool need_help = false;
bool endpoint_assigned = false;
for ( int i = 1; i < argc; ++i )
{
std::string arg( argv[ i ] );
if ( arg == "-v" )
{
cfg.verbose = true;
}
else if ( arg == "-h" || arg == "--help" )
{
need_help = true;
break;
}
else if ( arg == "--test-crash" )
{
cfg.test_crash = true;
break;
}
else if ( arg == "-f" || arg == "--file" )
{
if ( i + 1 >= argc )
{
need_help = true;
break;
}
cfg.db_path = argv[ i + 1 ];
i++;