diff DEPENDENCIES/generic/include/boost/log/sinks/unbounded_ordering_queue.hpp @ 16:2665513ce2d3

Add boost headers
author Chris Cannam
date Tue, 05 Aug 2014 11:11:38 +0100
parents
children c530137014c0
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/DEPENDENCIES/generic/include/boost/log/sinks/unbounded_ordering_queue.hpp	Tue Aug 05 11:11:38 2014 +0100
@@ -0,0 +1,298 @@
+/*
+ *          Copyright Andrey Semashev 2007 - 2013.
+ * Distributed under the Boost Software License, Version 1.0.
+ *    (See accompanying file LICENSE_1_0.txt or copy at
+ *          http://www.boost.org/LICENSE_1_0.txt)
+ */
+/*!
+ * \file   unbounded_ordering_queue.hpp
+ * \author Andrey Semashev
+ * \date   24.07.2011
+ *
+ * The header contains implementation of unbounded ordering record queueing strategy for
+ * the asynchronous sink frontend.
+ */
+
+#ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
+#define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
+
+#include <boost/log/detail/config.hpp>
+
+#ifdef BOOST_HAS_PRAGMA_ONCE
+#pragma once
+#endif
+
+#if defined(BOOST_LOG_NO_THREADS)
+#error Boost.Log: This header content is only supported in multithreaded environment
+#endif
+
+#include <queue>
+#include <vector>
+#include <boost/cstdint.hpp>
+#include <boost/move/core.hpp>
+#include <boost/move/utility.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/log/detail/timestamp.hpp>
+#include <boost/log/keywords/order.hpp>
+#include <boost/log/keywords/ordering_window.hpp>
+#include <boost/log/core/record_view.hpp>
+#include <boost/log/detail/header.hpp>
+
+namespace boost {
+
+BOOST_LOG_OPEN_NAMESPACE
+
+namespace sinks {
+
+/*!
+ * \brief Unbounded ordering log record queueing strategy
+ *
+ * The \c unbounded_ordering_queue class is intended to be used with
+ * the \c asynchronous_sink frontend as a log record queueing strategy.
+ *
+ * This strategy provides the following properties to the record queueing mechanism:
+ *
+ * \li The queue has no size limits.
+ * \li The queue has a fixed latency window. This means that each log record put
+ *     into the queue will normally not be dequeued for a certain period of time.
+ * \li The queue performs stable record ordering within the latency window.
+ *     The ordering predicate can be specified in the \c OrderT template parameter.
+ *
+ * Since this queue has no size limits, it may grow uncontrollably if sink backends
+ * dequeue log records not fast enough. When this is an issue, it is recommended to
+ * use one of the bounded strategies.
+ */
+template< typename OrderT >
+class unbounded_ordering_queue
+{
+private:
+    typedef boost::mutex mutex_type;
+
+    //! Log record with enqueueing timestamp
+    class enqueued_record
+    {
+        BOOST_COPYABLE_AND_MOVABLE(enqueued_record)
+
+    public:
+        //! Ordering predicate
+        struct order :
+            public OrderT
+        {
+            typedef typename OrderT::result_type result_type;
+
+            order() {}
+            order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {}
+            order(OrderT const& that) : OrderT(that) {}
+
+            result_type operator() (enqueued_record const& left, enqueued_record const& right) const
+            {
+                // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments
+                return OrderT::operator() (right.m_record, left.m_record);
+            }
+        };
+
+        boost::log::aux::timestamp m_timestamp;
+        record_view m_record;
+
+        enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record)
+        {
+        }
+        enqueued_record(BOOST_RV_REF(enqueued_record) that) :
+            m_timestamp(that.m_timestamp),
+            m_record(boost::move(that.m_record))
+        {
+        }
+        explicit enqueued_record(record_view const& rec) :
+            m_timestamp(boost::log::aux::get_timestamp()),
+            m_record(rec)
+        {
+        }
+        enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that)
+        {
+            m_timestamp = that.m_timestamp;
+            m_record = that.m_record;
+            return *this;
+        }
+        enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that)
+        {
+            m_timestamp = that.m_timestamp;
+            m_record = boost::move(that.m_record);
+            return *this;
+        }
+    };
+
+    typedef std::priority_queue<
+        enqueued_record,
+        std::vector< enqueued_record >,
+        typename enqueued_record::order
+    > queue_type;
+
+private:
+    //! Ordering window duration, in milliseconds
+    const uint64_t m_ordering_window;
+    //! Synchronization mutex
+    mutex_type m_mutex;
+    //! Condition for blocking
+    condition_variable m_cond;
+    //! Thread-safe queue
+    queue_type m_queue;
+    //! Interruption flag
+    bool m_interruption_requested;
+
+public:
+    /*!
+     * Returns ordering window size specified during initialization
+     */
+    posix_time::time_duration get_ordering_window() const
+    {
+        return posix_time::milliseconds(m_ordering_window);
+    }
+
+    /*!
+     * Returns default ordering window size.
+     * The default window size is specific to the operating system thread scheduling mechanism.
+     */
+    static posix_time::time_duration get_default_ordering_window()
+    {
+        // The main idea behind this parameter is that the ordering window should be large enough
+        // to allow the frontend to order records from different threads on an attribute
+        // that contains system time. Thus this value should be:
+        // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
+        //   For instance, on Windows it defaults to around 15-16 ms.
+        // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
+        //   switch threads on any known OS. It can be tuned for other platforms as needed.
+        return posix_time::milliseconds(30);
+    }
+
+protected:
+    //! Initializing constructor
+    template< typename ArgsT >
+    explicit unbounded_ordering_queue(ArgsT const& args) :
+        m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
+        m_queue(args[keywords::order]),
+        m_interruption_requested(false)
+    {
+    }
+
+    //! Enqueues log record to the queue
+    void enqueue(record_view const& rec)
+    {
+        lock_guard< mutex_type > lock(m_mutex);
+        enqueue_unlocked(rec);
+    }
+
+    //! Attempts to enqueue log record to the queue
+    bool try_enqueue(record_view const& rec)
+    {
+        unique_lock< mutex_type > lock(m_mutex, try_to_lock);
+        if (lock.owns_lock())
+        {
+            enqueue_unlocked(rec);
+            return true;
+        }
+        else
+            return false;
+    }
+
+    //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
+    bool try_dequeue_ready(record_view& rec)
+    {
+        lock_guard< mutex_type > lock(m_mutex);
+        if (!m_queue.empty())
+        {
+            const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
+            enqueued_record const& elem = m_queue.top();
+            if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
+            {
+                // We got a new element
+                rec = elem.m_record;
+                m_queue.pop();
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    //! Attempts to dequeue log record from the queue, does not block.
+    bool try_dequeue(record_view& rec)
+    {
+        lock_guard< mutex_type > lock(m_mutex);
+        if (!m_queue.empty())
+        {
+            enqueued_record const& elem = m_queue.top();
+            rec = elem.m_record;
+            m_queue.pop();
+            return true;
+        }
+
+        return false;
+    }
+
+    //! Dequeues log record from the queue, blocks if no log records are ready to be processed
+    bool dequeue_ready(record_view& rec)
+    {
+        unique_lock< mutex_type > lock(m_mutex);
+        while (!m_interruption_requested)
+        {
+            if (!m_queue.empty())
+            {
+                const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
+                enqueued_record const& elem = m_queue.top();
+                const uint64_t difference = (now - elem.m_timestamp).milliseconds();
+                if (difference >= m_ordering_window)
+                {
+                    // We got a new element
+                    rec = elem.m_record;
+                    m_queue.pop();
+                    return true;
+                }
+                else
+                {
+                    // Wait until the element becomes ready to be processed
+                    m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
+                }
+            }
+            else
+            {
+                // Wait for an element to come
+                m_cond.wait(lock);
+            }
+        }
+        m_interruption_requested = false;
+
+        return false;
+    }
+
+    //! Wakes a thread possibly blocked in the \c dequeue method
+    void interrupt_dequeue()
+    {
+        lock_guard< mutex_type > lock(m_mutex);
+        m_interruption_requested = true;
+        m_cond.notify_one();
+    }
+
+private:
+    //! Enqueues a log record
+    void enqueue_unlocked(record_view const& rec)
+    {
+        const bool was_empty = m_queue.empty();
+        m_queue.push(enqueued_record(rec));
+        if (was_empty)
+            m_cond.notify_one();
+    }
+};
+
+} // namespace sinks
+
+BOOST_LOG_CLOSE_NAMESPACE // namespace log
+
+} // namespace boost
+
+#include <boost/log/detail/footer.hpp>
+
+#endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_