annotate DEPENDENCIES/generic/include/boost/log/sinks/unbounded_ordering_queue.hpp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 /*
Chris@101 2 * Copyright Andrey Semashev 2007 - 2015.
Chris@16 3 * Distributed under the Boost Software License, Version 1.0.
Chris@16 4 * (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 * http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6 */
Chris@16 7 /*!
Chris@16 8 * \file unbounded_ordering_queue.hpp
Chris@16 9 * \author Andrey Semashev
Chris@16 10 * \date 24.07.2011
Chris@16 11 *
Chris@16 12 * The header contains implementation of unbounded ordering record queueing strategy for
Chris@16 13 * the asynchronous sink frontend.
Chris@16 14 */
Chris@16 15
Chris@16 16 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 17 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 18
Chris@16 19 #include <boost/log/detail/config.hpp>
Chris@16 20
Chris@16 21 #ifdef BOOST_HAS_PRAGMA_ONCE
Chris@16 22 #pragma once
Chris@16 23 #endif
Chris@16 24
Chris@16 25 #if defined(BOOST_LOG_NO_THREADS)
Chris@16 26 #error Boost.Log: This header content is only supported in multithreaded environment
Chris@16 27 #endif
Chris@16 28
Chris@16 29 #include <queue>
Chris@16 30 #include <vector>
Chris@16 31 #include <boost/cstdint.hpp>
Chris@16 32 #include <boost/thread/locks.hpp>
Chris@16 33 #include <boost/thread/mutex.hpp>
Chris@16 34 #include <boost/thread/condition_variable.hpp>
Chris@16 35 #include <boost/thread/thread_time.hpp>
Chris@16 36 #include <boost/date_time/posix_time/posix_time_types.hpp>
Chris@16 37 #include <boost/log/detail/timestamp.hpp>
Chris@101 38 #include <boost/log/detail/enqueued_record.hpp>
Chris@16 39 #include <boost/log/keywords/order.hpp>
Chris@16 40 #include <boost/log/keywords/ordering_window.hpp>
Chris@16 41 #include <boost/log/core/record_view.hpp>
Chris@16 42 #include <boost/log/detail/header.hpp>
Chris@16 43
Chris@16 44 namespace boost {
Chris@16 45
Chris@16 46 BOOST_LOG_OPEN_NAMESPACE
Chris@16 47
Chris@16 48 namespace sinks {
Chris@16 49
Chris@16 50 /*!
Chris@16 51 * \brief Unbounded ordering log record queueing strategy
Chris@16 52 *
Chris@16 53 * The \c unbounded_ordering_queue class is intended to be used with
Chris@16 54 * the \c asynchronous_sink frontend as a log record queueing strategy.
Chris@16 55 *
Chris@16 56 * This strategy provides the following properties to the record queueing mechanism:
Chris@16 57 *
Chris@16 58 * \li The queue has no size limits.
Chris@16 59 * \li The queue has a fixed latency window. This means that each log record put
Chris@16 60 * into the queue will normally not be dequeued for a certain period of time.
Chris@16 61 * \li The queue performs stable record ordering within the latency window.
Chris@16 62 * The ordering predicate can be specified in the \c OrderT template parameter.
Chris@16 63 *
Chris@16 64 * Since this queue has no size limits, it may grow uncontrollably if sink backends
Chris@16 65 * dequeue log records not fast enough. When this is an issue, it is recommended to
Chris@16 66 * use one of the bounded strategies.
Chris@16 67 */
Chris@16 68 template< typename OrderT >
Chris@16 69 class unbounded_ordering_queue
Chris@16 70 {
Chris@16 71 private:
Chris@16 72 typedef boost::mutex mutex_type;
Chris@101 73 typedef sinks::aux::enqueued_record enqueued_record;
Chris@16 74
Chris@16 75 typedef std::priority_queue<
Chris@16 76 enqueued_record,
Chris@16 77 std::vector< enqueued_record >,
Chris@101 78 enqueued_record::order< OrderT >
Chris@16 79 > queue_type;
Chris@16 80
Chris@16 81 private:
Chris@16 82 //! Ordering window duration, in milliseconds
Chris@16 83 const uint64_t m_ordering_window;
Chris@16 84 //! Synchronization mutex
Chris@16 85 mutex_type m_mutex;
Chris@16 86 //! Condition for blocking
Chris@16 87 condition_variable m_cond;
Chris@16 88 //! Thread-safe queue
Chris@16 89 queue_type m_queue;
Chris@16 90 //! Interruption flag
Chris@16 91 bool m_interruption_requested;
Chris@16 92
Chris@16 93 public:
Chris@16 94 /*!
Chris@16 95 * Returns ordering window size specified during initialization
Chris@16 96 */
Chris@16 97 posix_time::time_duration get_ordering_window() const
Chris@16 98 {
Chris@16 99 return posix_time::milliseconds(m_ordering_window);
Chris@16 100 }
Chris@16 101
Chris@16 102 /*!
Chris@16 103 * Returns default ordering window size.
Chris@16 104 * The default window size is specific to the operating system thread scheduling mechanism.
Chris@16 105 */
Chris@16 106 static posix_time::time_duration get_default_ordering_window()
Chris@16 107 {
Chris@16 108 // The main idea behind this parameter is that the ordering window should be large enough
Chris@16 109 // to allow the frontend to order records from different threads on an attribute
Chris@16 110 // that contains system time. Thus this value should be:
Chris@16 111 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
Chris@16 112 // For instance, on Windows it defaults to around 15-16 ms.
Chris@16 113 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
Chris@16 114 // switch threads on any known OS. It can be tuned for other platforms as needed.
Chris@16 115 return posix_time::milliseconds(30);
Chris@16 116 }
Chris@16 117
Chris@16 118 protected:
Chris@16 119 //! Initializing constructor
Chris@16 120 template< typename ArgsT >
Chris@16 121 explicit unbounded_ordering_queue(ArgsT const& args) :
Chris@16 122 m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
Chris@16 123 m_queue(args[keywords::order]),
Chris@16 124 m_interruption_requested(false)
Chris@16 125 {
Chris@16 126 }
Chris@16 127
Chris@16 128 //! Enqueues log record to the queue
Chris@16 129 void enqueue(record_view const& rec)
Chris@16 130 {
Chris@16 131 lock_guard< mutex_type > lock(m_mutex);
Chris@16 132 enqueue_unlocked(rec);
Chris@16 133 }
Chris@16 134
Chris@16 135 //! Attempts to enqueue log record to the queue
Chris@16 136 bool try_enqueue(record_view const& rec)
Chris@16 137 {
Chris@16 138 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
Chris@16 139 if (lock.owns_lock())
Chris@16 140 {
Chris@16 141 enqueue_unlocked(rec);
Chris@16 142 return true;
Chris@16 143 }
Chris@16 144 else
Chris@16 145 return false;
Chris@16 146 }
Chris@16 147
Chris@16 148 //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
Chris@16 149 bool try_dequeue_ready(record_view& rec)
Chris@16 150 {
Chris@16 151 lock_guard< mutex_type > lock(m_mutex);
Chris@16 152 if (!m_queue.empty())
Chris@16 153 {
Chris@16 154 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 155 enqueued_record const& elem = m_queue.top();
Chris@16 156 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
Chris@16 157 {
Chris@16 158 // We got a new element
Chris@16 159 rec = elem.m_record;
Chris@16 160 m_queue.pop();
Chris@16 161 return true;
Chris@16 162 }
Chris@16 163 }
Chris@16 164
Chris@16 165 return false;
Chris@16 166 }
Chris@16 167
Chris@16 168 //! Attempts to dequeue log record from the queue, does not block.
Chris@16 169 bool try_dequeue(record_view& rec)
Chris@16 170 {
Chris@16 171 lock_guard< mutex_type > lock(m_mutex);
Chris@16 172 if (!m_queue.empty())
Chris@16 173 {
Chris@16 174 enqueued_record const& elem = m_queue.top();
Chris@16 175 rec = elem.m_record;
Chris@16 176 m_queue.pop();
Chris@16 177 return true;
Chris@16 178 }
Chris@16 179
Chris@16 180 return false;
Chris@16 181 }
Chris@16 182
Chris@16 183 //! Dequeues log record from the queue, blocks if no log records are ready to be processed
Chris@16 184 bool dequeue_ready(record_view& rec)
Chris@16 185 {
Chris@16 186 unique_lock< mutex_type > lock(m_mutex);
Chris@16 187 while (!m_interruption_requested)
Chris@16 188 {
Chris@16 189 if (!m_queue.empty())
Chris@16 190 {
Chris@16 191 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 192 enqueued_record const& elem = m_queue.top();
Chris@16 193 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
Chris@16 194 if (difference >= m_ordering_window)
Chris@16 195 {
Chris@16 196 // We got a new element
Chris@16 197 rec = elem.m_record;
Chris@16 198 m_queue.pop();
Chris@16 199 return true;
Chris@16 200 }
Chris@16 201 else
Chris@16 202 {
Chris@16 203 // Wait until the element becomes ready to be processed
Chris@16 204 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
Chris@16 205 }
Chris@16 206 }
Chris@16 207 else
Chris@16 208 {
Chris@16 209 // Wait for an element to come
Chris@16 210 m_cond.wait(lock);
Chris@16 211 }
Chris@16 212 }
Chris@16 213 m_interruption_requested = false;
Chris@16 214
Chris@16 215 return false;
Chris@16 216 }
Chris@16 217
Chris@16 218 //! Wakes a thread possibly blocked in the \c dequeue method
Chris@16 219 void interrupt_dequeue()
Chris@16 220 {
Chris@16 221 lock_guard< mutex_type > lock(m_mutex);
Chris@16 222 m_interruption_requested = true;
Chris@16 223 m_cond.notify_one();
Chris@16 224 }
Chris@16 225
Chris@16 226 private:
Chris@16 227 //! Enqueues a log record
Chris@16 228 void enqueue_unlocked(record_view const& rec)
Chris@16 229 {
Chris@16 230 const bool was_empty = m_queue.empty();
Chris@16 231 m_queue.push(enqueued_record(rec));
Chris@16 232 if (was_empty)
Chris@16 233 m_cond.notify_one();
Chris@16 234 }
Chris@16 235 };
Chris@16 236
Chris@16 237 } // namespace sinks
Chris@16 238
Chris@16 239 BOOST_LOG_CLOSE_NAMESPACE // namespace log
Chris@16 240
Chris@16 241 } // namespace boost
Chris@16 242
Chris@16 243 #include <boost/log/detail/footer.hpp>
Chris@16 244
Chris@16 245 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_