Chris@16: /* Chris@101: * Copyright Andrey Semashev 2007 - 2015. Chris@16: * Distributed under the Boost Software License, Version 1.0. Chris@16: * (See accompanying file LICENSE_1_0.txt or copy at Chris@16: * http://www.boost.org/LICENSE_1_0.txt) Chris@16: */ Chris@16: /*! Chris@16: * \file unbounded_ordering_queue.hpp Chris@16: * \author Andrey Semashev Chris@16: * \date 24.07.2011 Chris@16: * Chris@16: * The header contains implementation of unbounded ordering record queueing strategy for Chris@16: * the asynchronous sink frontend. Chris@16: */ Chris@16: Chris@16: #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ Chris@16: #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ Chris@16: Chris@16: #include Chris@16: Chris@16: #ifdef BOOST_HAS_PRAGMA_ONCE Chris@16: #pragma once Chris@16: #endif Chris@16: Chris@16: #if defined(BOOST_LOG_NO_THREADS) Chris@16: #error Boost.Log: This header content is only supported in multithreaded environment Chris@16: #endif Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@101: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: namespace boost { Chris@16: Chris@16: BOOST_LOG_OPEN_NAMESPACE Chris@16: Chris@16: namespace sinks { Chris@16: Chris@16: /*! Chris@16: * \brief Unbounded ordering log record queueing strategy Chris@16: * Chris@16: * The \c unbounded_ordering_queue class is intended to be used with Chris@16: * the \c asynchronous_sink frontend as a log record queueing strategy. Chris@16: * Chris@16: * This strategy provides the following properties to the record queueing mechanism: Chris@16: * Chris@16: * \li The queue has no size limits. Chris@16: * \li The queue has a fixed latency window. This means that each log record put Chris@16: * into the queue will normally not be dequeued for a certain period of time. Chris@16: * \li The queue performs stable record ordering within the latency window. Chris@16: * The ordering predicate can be specified in the \c OrderT template parameter. Chris@16: * Chris@16: * Since this queue has no size limits, it may grow uncontrollably if sink backends Chris@16: * dequeue log records not fast enough. When this is an issue, it is recommended to Chris@16: * use one of the bounded strategies. Chris@16: */ Chris@16: template< typename OrderT > Chris@16: class unbounded_ordering_queue Chris@16: { Chris@16: private: Chris@16: typedef boost::mutex mutex_type; Chris@101: typedef sinks::aux::enqueued_record enqueued_record; Chris@16: Chris@16: typedef std::priority_queue< Chris@16: enqueued_record, Chris@16: std::vector< enqueued_record >, Chris@101: enqueued_record::order< OrderT > Chris@16: > queue_type; Chris@16: Chris@16: private: Chris@16: //! Ordering window duration, in milliseconds Chris@16: const uint64_t m_ordering_window; Chris@16: //! Synchronization mutex Chris@16: mutex_type m_mutex; Chris@16: //! Condition for blocking Chris@16: condition_variable m_cond; Chris@16: //! Thread-safe queue Chris@16: queue_type m_queue; Chris@16: //! Interruption flag Chris@16: bool m_interruption_requested; Chris@16: Chris@16: public: Chris@16: /*! Chris@16: * Returns ordering window size specified during initialization Chris@16: */ Chris@16: posix_time::time_duration get_ordering_window() const Chris@16: { Chris@16: return posix_time::milliseconds(m_ordering_window); Chris@16: } Chris@16: Chris@16: /*! Chris@16: * Returns default ordering window size. Chris@16: * The default window size is specific to the operating system thread scheduling mechanism. Chris@16: */ Chris@16: static posix_time::time_duration get_default_ordering_window() Chris@16: { Chris@16: // The main idea behind this parameter is that the ordering window should be large enough Chris@16: // to allow the frontend to order records from different threads on an attribute Chris@16: // that contains system time. Thus this value should be: Chris@16: // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. Chris@16: // For instance, on Windows it defaults to around 15-16 ms. Chris@16: // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to Chris@16: // switch threads on any known OS. It can be tuned for other platforms as needed. Chris@16: return posix_time::milliseconds(30); Chris@16: } Chris@16: Chris@16: protected: Chris@16: //! Initializing constructor Chris@16: template< typename ArgsT > Chris@16: explicit unbounded_ordering_queue(ArgsT const& args) : Chris@16: m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()), Chris@16: m_queue(args[keywords::order]), Chris@16: m_interruption_requested(false) Chris@16: { Chris@16: } Chris@16: Chris@16: //! Enqueues log record to the queue Chris@16: void enqueue(record_view const& rec) Chris@16: { Chris@16: lock_guard< mutex_type > lock(m_mutex); Chris@16: enqueue_unlocked(rec); Chris@16: } Chris@16: Chris@16: //! Attempts to enqueue log record to the queue Chris@16: bool try_enqueue(record_view const& rec) Chris@16: { Chris@16: unique_lock< mutex_type > lock(m_mutex, try_to_lock); Chris@16: if (lock.owns_lock()) Chris@16: { Chris@16: enqueue_unlocked(rec); Chris@16: return true; Chris@16: } Chris@16: else Chris@16: return false; Chris@16: } Chris@16: Chris@16: //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed Chris@16: bool try_dequeue_ready(record_view& rec) Chris@16: { Chris@16: lock_guard< mutex_type > lock(m_mutex); Chris@16: if (!m_queue.empty()) Chris@16: { Chris@16: const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); Chris@16: enqueued_record const& elem = m_queue.top(); Chris@16: if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) Chris@16: { Chris@16: // We got a new element Chris@16: rec = elem.m_record; Chris@16: m_queue.pop(); Chris@16: return true; Chris@16: } Chris@16: } Chris@16: Chris@16: return false; Chris@16: } Chris@16: Chris@16: //! Attempts to dequeue log record from the queue, does not block. Chris@16: bool try_dequeue(record_view& rec) Chris@16: { Chris@16: lock_guard< mutex_type > lock(m_mutex); Chris@16: if (!m_queue.empty()) Chris@16: { Chris@16: enqueued_record const& elem = m_queue.top(); Chris@16: rec = elem.m_record; Chris@16: m_queue.pop(); Chris@16: return true; Chris@16: } Chris@16: Chris@16: return false; Chris@16: } Chris@16: Chris@16: //! Dequeues log record from the queue, blocks if no log records are ready to be processed Chris@16: bool dequeue_ready(record_view& rec) Chris@16: { Chris@16: unique_lock< mutex_type > lock(m_mutex); Chris@16: while (!m_interruption_requested) Chris@16: { Chris@16: if (!m_queue.empty()) Chris@16: { Chris@16: const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); Chris@16: enqueued_record const& elem = m_queue.top(); Chris@16: const uint64_t difference = (now - elem.m_timestamp).milliseconds(); Chris@16: if (difference >= m_ordering_window) Chris@16: { Chris@16: // We got a new element Chris@16: rec = elem.m_record; Chris@16: m_queue.pop(); Chris@16: return true; Chris@16: } Chris@16: else Chris@16: { Chris@16: // Wait until the element becomes ready to be processed Chris@16: m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); Chris@16: } Chris@16: } Chris@16: else Chris@16: { Chris@16: // Wait for an element to come Chris@16: m_cond.wait(lock); Chris@16: } Chris@16: } Chris@16: m_interruption_requested = false; Chris@16: Chris@16: return false; Chris@16: } Chris@16: Chris@16: //! Wakes a thread possibly blocked in the \c dequeue method Chris@16: void interrupt_dequeue() Chris@16: { Chris@16: lock_guard< mutex_type > lock(m_mutex); Chris@16: m_interruption_requested = true; Chris@16: m_cond.notify_one(); Chris@16: } Chris@16: Chris@16: private: Chris@16: //! Enqueues a log record Chris@16: void enqueue_unlocked(record_view const& rec) Chris@16: { Chris@16: const bool was_empty = m_queue.empty(); Chris@16: m_queue.push(enqueued_record(rec)); Chris@16: if (was_empty) Chris@16: m_cond.notify_one(); Chris@16: } Chris@16: }; Chris@16: Chris@16: } // namespace sinks Chris@16: Chris@16: BOOST_LOG_CLOSE_NAMESPACE // namespace log Chris@16: Chris@16: } // namespace boost Chris@16: Chris@16: #include Chris@16: Chris@16: #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_