Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
message_queue.hh 5.48 KiB
//
// Created by jonathan.hanks on 10/6/20.
//

#ifndef DAQD_TRUNK_CHANNEL_QUEUE_HH
#define DAQD_TRUNK_CHANNEL_QUEUE_HH

#include <condition_variable>
#include <thread>
#include <type_traits>

#include <boost/optional.hpp>

template < typename T, std::size_t MaxSize >
class Message_queue;

namespace detail
{
    template < typename T, std::size_t MaxSize >
    class mq_introspect
    {
    public:
        using mq_type = Message_queue< T, MaxSize >;
        explicit mq_introspect( const mq_type& mq ) noexcept : mq_{ mq }
        {
        }
        mq_introspect( const mq_introspect& other ) noexcept = default;

        typename mq_type::size_type
        begin_index( ) const
        {
            return mq_.begin_;
        }
        typename mq_type::size_type
        end_index( ) const
        {
            return mq_.end_;
        }

    private:
        const mq_type& mq_;
    };

    template < typename T, std::size_t MaxSize >
    mq_introspect< T, MaxSize >
    introspect( const Message_queue< T, MaxSize >& mq )
    {
        return mq_introspect< T, MaxSize >( mq );
    }
} // namespace detail

template < typename T, std::size_t MaxSize >
class Message_queue
{
    using lock_type = std::unique_lock< std::mutex >;
    using storage_type =
        typename std::aligned_storage< sizeof( T ), alignof( T ) >::type;

    friend class detail::mq_introspect< T, MaxSize >;

public:
    using value_type = T;

    static_assert( MaxSize > 0, "the MaxSize parameter must be > 0" );
    static_assert( std::is_nothrow_move_constructible< T >::value == true,
                   "The type must be no through move constructable" );
    static_assert( std::is_nothrow_move_assignable< T >::value == true,
                   "The type must be no throw move assignable" );

    ~Message_queue( )
    {
        while ( size_ > 0 )
        {
            T* src = reinterpret_cast< T* >( &storage_[ end_ ] );
            end_ = decr( end_ );
            --size_;
            src->~T( );
        }
    }

    using size_type = std::size_t;

    size_type
    size( ) const
    {
        lock_type l{ m_ };
        return size_;
    }

    bool
    empty( ) const
    {
        lock_type l{ m_ };
        return size_ == 0;
    }

    constexpr size_type
              capacity( ) const
    {
        return MaxSize;
    }

    template < typename... Args >
    void
    emplace( Args... args )
    {
        lock_type l{ m_ };
        wait_for_free_space( l );
        emplace_while_locked( std::forward< Args >( args )... );
    }

    template < typename Duration, typename... Args >
    boost::optional< T >
    emplace_with_timeout( const Duration duration, Args... args )
    {
        boost::optional< T > results = boost::none;

        lock_type l{ m_ };
        wait_for_free_space_with_timeout( l, duration );
        if ( size_ < MaxSize )
        {
            emplace_while_locked( std::forward< Args >( args )... );
        }
        else
        {
            results = std::move( T( std::forward< Args >( args )... ) );
        }
        return results;
    };

    T
    pop( ) noexcept
    {
        lock_type l{ m_ };
        wait_for_non_empty( l );

        return pop_while_locked( );
    }

    template < typename Duration >
    boost::optional< T >
    pop( const Duration& duration )
    {
        boost::optional< T > result = boost::none;

        lock_type l{ m_ };
        wait_for_non_empty_with_timeout( l, duration );
        if ( size_ > 0 )
        {
            result = pop_while_locked( );
        }
        return result;
    }

private:
    template < typename... Args >
    void
    emplace_while_locked( Args... args )
    {
        T* dest = reinterpret_cast< T* >( &storage_[ begin_ ] );
        new ( dest ) T( std::forward< Args >( args )... );
        begin_ = decr( begin_ );
        ++size_;
        item_added_flag_.notify_one( );
    }

    T
    pop_while_locked( ) noexcept
    {
        T* src = reinterpret_cast< T* >( &storage_[ end_ ] );
        end_ = decr( end_ );
        --size_;
        T tmp( std::move( *src ) );
        src->~T( );
        item_removed_flag_.notify_one( );
        return std::move( tmp );
    }

    constexpr size_type
              decr( size_type i ) const noexcept
    {
        return ( i == 0 ? MaxSize - 1 : i - 1 );
    }

    void
    wait_for_free_space( lock_type& l )
    {
        while ( size_ == MaxSize )
        {
            item_removed_flag_.wait( l );
        }
    }

    template < typename Duration >
    void
    wait_for_free_space_with_timeout( lock_type& l, const Duration& timeout )
    {
        if ( size_ < MaxSize )
        {
            return;
        }
        item_removed_flag_.wait_for(
            l, timeout, [this]( ) -> bool { return size_ < MaxSize; } );
    }

    void
    wait_for_non_empty( lock_type& l )
    {
        while ( size_ == 0 )
        {
            item_added_flag_.wait( l );
        }
    }

    template < typename Duration >
    void
    wait_for_non_empty_with_timeout( lock_type& l, const Duration& timeout )
    {
        if ( size_ > 0 )
        {
            return;
        }
        item_added_flag_.wait_for(
            l, timeout, [this]( ) -> bool { return size_ > 0; } );
    }

    mutable std::mutex      m_{};
    std::condition_variable item_added_flag_{};
    std::condition_variable item_removed_flag_{};
    storage_type            storage_[ MaxSize ]{};
    size_type               size_{ 0 };
    size_type               begin_{ MaxSize - 1 };
    size_type               end_{ MaxSize - 1 };
};

#endif // DAQD_TRUNK_CHANNEL_QUEUE_HH