Chris@16: /* Chris@101: * Copyright Andrey Semashev 2007 - 2015. Chris@16: * Distributed under the Boost Software License, Version 1.0. Chris@16: * (See accompanying file LICENSE_1_0.txt or copy at Chris@16: * http://www.boost.org/LICENSE_1_0.txt) Chris@16: */ Chris@16: /*! Chris@16: * \file bounded_fifo_queue.hpp Chris@16: * \author Andrey Semashev Chris@16: * \date 04.01.2012 Chris@16: * Chris@16: * The header contains implementation of bounded FIFO queueing strategy for Chris@16: * the asynchronous sink frontend. Chris@16: */ Chris@16: Chris@16: #ifndef BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_ Chris@16: #define BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_ Chris@16: Chris@16: #include Chris@16: Chris@16: #ifdef BOOST_HAS_PRAGMA_ONCE Chris@16: #pragma once Chris@16: #endif Chris@16: Chris@16: #if defined(BOOST_LOG_NO_THREADS) Chris@16: #error Boost.Log: This header content is only supported in multithreaded environment Chris@16: #endif Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: namespace boost { Chris@16: Chris@16: BOOST_LOG_OPEN_NAMESPACE Chris@16: Chris@16: namespace sinks { Chris@16: Chris@16: /*! Chris@16: * \brief Bounded FIFO log record queueing strategy Chris@16: * Chris@16: * The \c bounded_fifo_queue class is intended to be used with Chris@16: * the \c asynchronous_sink frontend as a log record queueing strategy. Chris@16: * Chris@16: * This strategy describes log record queueing logic. Chris@16: * The queue has a limited capacity, upon reaching which the enqueue operation will Chris@16: * invoke the overflow handling strategy specified in the \c OverflowStrategyT Chris@16: * template parameter to handle the situation. The library provides overflow handling Chris@16: * strategies for most common cases: \c drop_on_overflow will silently discard the log record, Chris@16: * and \c block_on_overflow will put the enqueueing thread to wait until there is space Chris@16: * in the queue. Chris@16: * Chris@16: * The log record queue imposes no ordering over the queued Chris@16: * elements aside from the order in which they are enqueued. Chris@16: */ Chris@16: template< std::size_t MaxQueueSizeV, typename OverflowStrategyT > Chris@16: class bounded_fifo_queue : Chris@16: private OverflowStrategyT Chris@16: { Chris@16: private: Chris@16: typedef OverflowStrategyT overflow_strategy; Chris@16: typedef std::queue< record_view > queue_type; Chris@16: typedef boost::mutex mutex_type; Chris@16: Chris@16: private: Chris@16: //! Synchronization primitive Chris@16: mutex_type m_mutex; Chris@16: //! Condition to block the consuming thread on Chris@16: condition_variable m_cond; Chris@16: //! Log record queue Chris@16: queue_type m_queue; Chris@16: //! Interruption flag Chris@16: bool m_interruption_requested; Chris@16: Chris@16: protected: Chris@16: //! Default constructor Chris@16: bounded_fifo_queue() : m_interruption_requested(false) Chris@16: { Chris@16: } Chris@16: //! Initializing constructor Chris@16: template< typename ArgsT > Chris@16: explicit bounded_fifo_queue(ArgsT const&) : m_interruption_requested(false) Chris@16: { Chris@16: } Chris@16: Chris@16: //! Enqueues log record to the queue Chris@16: void enqueue(record_view const& rec) Chris@16: { Chris@16: unique_lock< mutex_type > lock(m_mutex); Chris@16: std::size_t size = m_queue.size(); Chris@16: for (; size >= MaxQueueSizeV; size = m_queue.size()) Chris@16: { Chris@16: if (!overflow_strategy::on_overflow(rec, lock)) Chris@16: return; Chris@16: } Chris@16: Chris@16: m_queue.push(rec); Chris@16: if (size == 0) Chris@16: m_cond.notify_one(); Chris@16: } Chris@16: Chris@16: //! Attempts to enqueue log record to the queue Chris@16: bool try_enqueue(record_view const& rec) Chris@16: { Chris@16: unique_lock< mutex_type > lock(m_mutex, try_to_lock); Chris@16: if (lock.owns_lock()) Chris@16: { Chris@16: const std::size_t size = m_queue.size(); Chris@16: Chris@16: // Do not invoke the bounding strategy in case of overflow as it may block Chris@16: if (size < MaxQueueSizeV) Chris@16: { Chris@16: m_queue.push(rec); Chris@16: if (size == 0) Chris@16: m_cond.notify_one(); Chris@16: return true; Chris@16: } Chris@16: } Chris@16: Chris@16: return false; Chris@16: } Chris@16: Chris@16: //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty Chris@16: bool try_dequeue_ready(record_view& rec) Chris@16: { Chris@16: return try_dequeue(rec); Chris@16: } Chris@16: Chris@16: //! Attempts to dequeue log record from the queue, does not block if the queue is empty Chris@16: bool try_dequeue(record_view& rec) Chris@16: { Chris@16: lock_guard< mutex_type > lock(m_mutex); Chris@16: const std::size_t size = m_queue.size(); Chris@16: if (size > 0) Chris@16: { Chris@16: rec.swap(m_queue.front()); Chris@16: m_queue.pop(); Chris@101: overflow_strategy::on_queue_space_available(); Chris@16: return true; Chris@16: } Chris@16: Chris@16: return false; Chris@16: } Chris@16: Chris@16: //! Dequeues log record from the queue, blocks if the queue is empty Chris@16: bool dequeue_ready(record_view& rec) Chris@16: { Chris@16: unique_lock< mutex_type > lock(m_mutex); Chris@16: Chris@16: while (!m_interruption_requested) Chris@16: { Chris@16: const std::size_t size = m_queue.size(); Chris@16: if (size > 0) Chris@16: { Chris@16: rec.swap(m_queue.front()); Chris@16: m_queue.pop(); Chris@101: overflow_strategy::on_queue_space_available(); Chris@16: return true; Chris@16: } Chris@16: else Chris@16: { Chris@16: m_cond.wait(lock); Chris@16: } Chris@16: } Chris@16: m_interruption_requested = false; Chris@16: Chris@16: return false; Chris@16: } Chris@16: Chris@16: //! Wakes a thread possibly blocked in the \c dequeue method Chris@16: void interrupt_dequeue() Chris@16: { Chris@16: lock_guard< mutex_type > lock(m_mutex); Chris@16: m_interruption_requested = true; Chris@16: overflow_strategy::interrupt(); Chris@16: m_cond.notify_one(); Chris@16: } Chris@16: }; Chris@16: Chris@16: } // namespace sinks Chris@16: Chris@16: BOOST_LOG_CLOSE_NAMESPACE // namespace log Chris@16: Chris@16: } // namespace boost Chris@16: Chris@16: #include Chris@16: Chris@16: #endif // BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_