Something went wrong on our end
-
Jonathan Hanks authoredJonathan Hanks authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
message_queue.hh 5.48 KiB
//
// Created by jonathan.hanks on 10/6/20.
//
#ifndef DAQD_TRUNK_CHANNEL_QUEUE_HH
#define DAQD_TRUNK_CHANNEL_QUEUE_HH
#include <condition_variable>
#include <thread>
#include <type_traits>
#include <boost/optional.hpp>
template < typename T, std::size_t MaxSize >
class Message_queue;
namespace detail
{
template < typename T, std::size_t MaxSize >
class mq_introspect
{
public:
using mq_type = Message_queue< T, MaxSize >;
explicit mq_introspect( const mq_type& mq ) noexcept : mq_{ mq }
{
}
mq_introspect( const mq_introspect& other ) noexcept = default;
typename mq_type::size_type
begin_index( ) const
{
return mq_.begin_;
}
typename mq_type::size_type
end_index( ) const
{
return mq_.end_;
}
private:
const mq_type& mq_;
};
template < typename T, std::size_t MaxSize >
mq_introspect< T, MaxSize >
introspect( const Message_queue< T, MaxSize >& mq )
{
return mq_introspect< T, MaxSize >( mq );
}
} // namespace detail
template < typename T, std::size_t MaxSize >
class Message_queue
{
using lock_type = std::unique_lock< std::mutex >;
using storage_type =
typename std::aligned_storage< sizeof( T ), alignof( T ) >::type;
friend class detail::mq_introspect< T, MaxSize >;
public:
using value_type = T;
static_assert( MaxSize > 0, "the MaxSize parameter must be > 0" );
static_assert( std::is_nothrow_move_constructible< T >::value == true,
"The type must be no through move constructable" );
static_assert( std::is_nothrow_move_assignable< T >::value == true,
"The type must be no throw move assignable" );
~Message_queue( )
{
while ( size_ > 0 )
{
T* src = reinterpret_cast< T* >( &storage_[ end_ ] );
end_ = decr( end_ );
--size_;
src->~T( );
}
}
using size_type = std::size_t;
size_type
size( ) const
{
lock_type l{ m_ };
return size_;
}
bool
empty( ) const
{
lock_type l{ m_ };
return size_ == 0;
}
constexpr size_type
capacity( ) const
{
return MaxSize;
}
template < typename... Args >
void
emplace( Args... args )
{
lock_type l{ m_ };
wait_for_free_space( l );
emplace_while_locked( std::forward< Args >( args )... );
}
template < typename Duration, typename... Args >
boost::optional< T >
emplace_with_timeout( const Duration duration, Args... args )
{
boost::optional< T > results = boost::none;
lock_type l{ m_ };
wait_for_free_space_with_timeout( l, duration );
if ( size_ < MaxSize )
{
emplace_while_locked( std::forward< Args >( args )... );
}
else
{
results = std::move( T( std::forward< Args >( args )... ) );
}
return results;
};
T
pop( ) noexcept
{
lock_type l{ m_ };
wait_for_non_empty( l );
return pop_while_locked( );
}
template < typename Duration >
boost::optional< T >
pop( const Duration& duration )
{
boost::optional< T > result = boost::none;
lock_type l{ m_ };
wait_for_non_empty_with_timeout( l, duration );
if ( size_ > 0 )
{
result = pop_while_locked( );
}
return result;
}
private:
template < typename... Args >
void
emplace_while_locked( Args... args )
{
T* dest = reinterpret_cast< T* >( &storage_[ begin_ ] );
new ( dest ) T( std::forward< Args >( args )... );
begin_ = decr( begin_ );
++size_;
item_added_flag_.notify_one( );
}
T
pop_while_locked( ) noexcept
{
T* src = reinterpret_cast< T* >( &storage_[ end_ ] );
end_ = decr( end_ );
--size_;
T tmp( std::move( *src ) );
src->~T( );
item_removed_flag_.notify_one( );
return std::move( tmp );
}
constexpr size_type
decr( size_type i ) const noexcept
{
return ( i == 0 ? MaxSize - 1 : i - 1 );
}
void
wait_for_free_space( lock_type& l )
{
while ( size_ == MaxSize )
{
item_removed_flag_.wait( l );
}
}
template < typename Duration >
void
wait_for_free_space_with_timeout( lock_type& l, const Duration& timeout )
{
if ( size_ < MaxSize )
{
return;
}
item_removed_flag_.wait_for(
l, timeout, [this]( ) -> bool { return size_ < MaxSize; } );
}
void
wait_for_non_empty( lock_type& l )
{
while ( size_ == 0 )
{
item_added_flag_.wait( l );
}
}
template < typename Duration >
void
wait_for_non_empty_with_timeout( lock_type& l, const Duration& timeout )
{
if ( size_ > 0 )
{
return;
}
item_added_flag_.wait_for(
l, timeout, [this]( ) -> bool { return size_ > 0; } );
}
mutable std::mutex m_{};
std::condition_variable item_added_flag_{};
std::condition_variable item_removed_flag_{};
storage_type storage_[ MaxSize ]{};
size_type size_{ 0 };
size_type begin_{ MaxSize - 1 };
size_type end_{ MaxSize - 1 };
};
#endif // DAQD_TRUNK_CHANNEL_QUEUE_HH