annotate DEPENDENCIES/generic/include/boost/log/sinks/unbounded_ordering_queue.hpp @ 69:300f9fa4b454

Restore erroneous subrepo
author Chris Cannam
date Thu, 16 Oct 2014 14:41:48 +0100
parents 2665513ce2d3
children c530137014c0
rev   line source
Chris@16 1 /*
Chris@16 2 * Copyright Andrey Semashev 2007 - 2013.
Chris@16 3 * Distributed under the Boost Software License, Version 1.0.
Chris@16 4 * (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 * http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6 */
Chris@16 7 /*!
Chris@16 8 * \file unbounded_ordering_queue.hpp
Chris@16 9 * \author Andrey Semashev
Chris@16 10 * \date 24.07.2011
Chris@16 11 *
Chris@16 12 * The header contains implementation of unbounded ordering record queueing strategy for
Chris@16 13 * the asynchronous sink frontend.
Chris@16 14 */
Chris@16 15
Chris@16 16 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 17 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 18
Chris@16 19 #include <boost/log/detail/config.hpp>
Chris@16 20
Chris@16 21 #ifdef BOOST_HAS_PRAGMA_ONCE
Chris@16 22 #pragma once
Chris@16 23 #endif
Chris@16 24
Chris@16 25 #if defined(BOOST_LOG_NO_THREADS)
Chris@16 26 #error Boost.Log: This header content is only supported in multithreaded environment
Chris@16 27 #endif
Chris@16 28
Chris@16 29 #include <queue>
Chris@16 30 #include <vector>
Chris@16 31 #include <boost/cstdint.hpp>
Chris@16 32 #include <boost/move/core.hpp>
Chris@16 33 #include <boost/move/utility.hpp>
Chris@16 34 #include <boost/thread/locks.hpp>
Chris@16 35 #include <boost/thread/mutex.hpp>
Chris@16 36 #include <boost/thread/condition_variable.hpp>
Chris@16 37 #include <boost/thread/thread_time.hpp>
Chris@16 38 #include <boost/date_time/posix_time/posix_time_types.hpp>
Chris@16 39 #include <boost/log/detail/timestamp.hpp>
Chris@16 40 #include <boost/log/keywords/order.hpp>
Chris@16 41 #include <boost/log/keywords/ordering_window.hpp>
Chris@16 42 #include <boost/log/core/record_view.hpp>
Chris@16 43 #include <boost/log/detail/header.hpp>
Chris@16 44
Chris@16 45 namespace boost {
Chris@16 46
Chris@16 47 BOOST_LOG_OPEN_NAMESPACE
Chris@16 48
Chris@16 49 namespace sinks {
Chris@16 50
Chris@16 51 /*!
Chris@16 52 * \brief Unbounded ordering log record queueing strategy
Chris@16 53 *
Chris@16 54 * The \c unbounded_ordering_queue class is intended to be used with
Chris@16 55 * the \c asynchronous_sink frontend as a log record queueing strategy.
Chris@16 56 *
Chris@16 57 * This strategy provides the following properties to the record queueing mechanism:
Chris@16 58 *
Chris@16 59 * \li The queue has no size limits.
Chris@16 60 * \li The queue has a fixed latency window. This means that each log record put
Chris@16 61 * into the queue will normally not be dequeued for a certain period of time.
Chris@16 62 * \li The queue performs stable record ordering within the latency window.
Chris@16 63 * The ordering predicate can be specified in the \c OrderT template parameter.
Chris@16 64 *
Chris@16 65 * Since this queue has no size limits, it may grow uncontrollably if sink backends
Chris@16 66 * dequeue log records not fast enough. When this is an issue, it is recommended to
Chris@16 67 * use one of the bounded strategies.
Chris@16 68 */
Chris@16 69 template< typename OrderT >
Chris@16 70 class unbounded_ordering_queue
Chris@16 71 {
Chris@16 72 private:
Chris@16 73 typedef boost::mutex mutex_type;
Chris@16 74
Chris@16 75 //! Log record with enqueueing timestamp
Chris@16 76 class enqueued_record
Chris@16 77 {
Chris@16 78 BOOST_COPYABLE_AND_MOVABLE(enqueued_record)
Chris@16 79
Chris@16 80 public:
Chris@16 81 //! Ordering predicate
Chris@16 82 struct order :
Chris@16 83 public OrderT
Chris@16 84 {
Chris@16 85 typedef typename OrderT::result_type result_type;
Chris@16 86
Chris@16 87 order() {}
Chris@16 88 order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {}
Chris@16 89 order(OrderT const& that) : OrderT(that) {}
Chris@16 90
Chris@16 91 result_type operator() (enqueued_record const& left, enqueued_record const& right) const
Chris@16 92 {
Chris@16 93 // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments
Chris@16 94 return OrderT::operator() (right.m_record, left.m_record);
Chris@16 95 }
Chris@16 96 };
Chris@16 97
Chris@16 98 boost::log::aux::timestamp m_timestamp;
Chris@16 99 record_view m_record;
Chris@16 100
Chris@16 101 enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record)
Chris@16 102 {
Chris@16 103 }
Chris@16 104 enqueued_record(BOOST_RV_REF(enqueued_record) that) :
Chris@16 105 m_timestamp(that.m_timestamp),
Chris@16 106 m_record(boost::move(that.m_record))
Chris@16 107 {
Chris@16 108 }
Chris@16 109 explicit enqueued_record(record_view const& rec) :
Chris@16 110 m_timestamp(boost::log::aux::get_timestamp()),
Chris@16 111 m_record(rec)
Chris@16 112 {
Chris@16 113 }
Chris@16 114 enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that)
Chris@16 115 {
Chris@16 116 m_timestamp = that.m_timestamp;
Chris@16 117 m_record = that.m_record;
Chris@16 118 return *this;
Chris@16 119 }
Chris@16 120 enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that)
Chris@16 121 {
Chris@16 122 m_timestamp = that.m_timestamp;
Chris@16 123 m_record = boost::move(that.m_record);
Chris@16 124 return *this;
Chris@16 125 }
Chris@16 126 };
Chris@16 127
Chris@16 128 typedef std::priority_queue<
Chris@16 129 enqueued_record,
Chris@16 130 std::vector< enqueued_record >,
Chris@16 131 typename enqueued_record::order
Chris@16 132 > queue_type;
Chris@16 133
Chris@16 134 private:
Chris@16 135 //! Ordering window duration, in milliseconds
Chris@16 136 const uint64_t m_ordering_window;
Chris@16 137 //! Synchronization mutex
Chris@16 138 mutex_type m_mutex;
Chris@16 139 //! Condition for blocking
Chris@16 140 condition_variable m_cond;
Chris@16 141 //! Thread-safe queue
Chris@16 142 queue_type m_queue;
Chris@16 143 //! Interruption flag
Chris@16 144 bool m_interruption_requested;
Chris@16 145
Chris@16 146 public:
Chris@16 147 /*!
Chris@16 148 * Returns ordering window size specified during initialization
Chris@16 149 */
Chris@16 150 posix_time::time_duration get_ordering_window() const
Chris@16 151 {
Chris@16 152 return posix_time::milliseconds(m_ordering_window);
Chris@16 153 }
Chris@16 154
Chris@16 155 /*!
Chris@16 156 * Returns default ordering window size.
Chris@16 157 * The default window size is specific to the operating system thread scheduling mechanism.
Chris@16 158 */
Chris@16 159 static posix_time::time_duration get_default_ordering_window()
Chris@16 160 {
Chris@16 161 // The main idea behind this parameter is that the ordering window should be large enough
Chris@16 162 // to allow the frontend to order records from different threads on an attribute
Chris@16 163 // that contains system time. Thus this value should be:
Chris@16 164 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
Chris@16 165 // For instance, on Windows it defaults to around 15-16 ms.
Chris@16 166 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
Chris@16 167 // switch threads on any known OS. It can be tuned for other platforms as needed.
Chris@16 168 return posix_time::milliseconds(30);
Chris@16 169 }
Chris@16 170
Chris@16 171 protected:
Chris@16 172 //! Initializing constructor
Chris@16 173 template< typename ArgsT >
Chris@16 174 explicit unbounded_ordering_queue(ArgsT const& args) :
Chris@16 175 m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
Chris@16 176 m_queue(args[keywords::order]),
Chris@16 177 m_interruption_requested(false)
Chris@16 178 {
Chris@16 179 }
Chris@16 180
Chris@16 181 //! Enqueues log record to the queue
Chris@16 182 void enqueue(record_view const& rec)
Chris@16 183 {
Chris@16 184 lock_guard< mutex_type > lock(m_mutex);
Chris@16 185 enqueue_unlocked(rec);
Chris@16 186 }
Chris@16 187
Chris@16 188 //! Attempts to enqueue log record to the queue
Chris@16 189 bool try_enqueue(record_view const& rec)
Chris@16 190 {
Chris@16 191 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
Chris@16 192 if (lock.owns_lock())
Chris@16 193 {
Chris@16 194 enqueue_unlocked(rec);
Chris@16 195 return true;
Chris@16 196 }
Chris@16 197 else
Chris@16 198 return false;
Chris@16 199 }
Chris@16 200
Chris@16 201 //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
Chris@16 202 bool try_dequeue_ready(record_view& rec)
Chris@16 203 {
Chris@16 204 lock_guard< mutex_type > lock(m_mutex);
Chris@16 205 if (!m_queue.empty())
Chris@16 206 {
Chris@16 207 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 208 enqueued_record const& elem = m_queue.top();
Chris@16 209 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
Chris@16 210 {
Chris@16 211 // We got a new element
Chris@16 212 rec = elem.m_record;
Chris@16 213 m_queue.pop();
Chris@16 214 return true;
Chris@16 215 }
Chris@16 216 }
Chris@16 217
Chris@16 218 return false;
Chris@16 219 }
Chris@16 220
Chris@16 221 //! Attempts to dequeue log record from the queue, does not block.
Chris@16 222 bool try_dequeue(record_view& rec)
Chris@16 223 {
Chris@16 224 lock_guard< mutex_type > lock(m_mutex);
Chris@16 225 if (!m_queue.empty())
Chris@16 226 {
Chris@16 227 enqueued_record const& elem = m_queue.top();
Chris@16 228 rec = elem.m_record;
Chris@16 229 m_queue.pop();
Chris@16 230 return true;
Chris@16 231 }
Chris@16 232
Chris@16 233 return false;
Chris@16 234 }
Chris@16 235
Chris@16 236 //! Dequeues log record from the queue, blocks if no log records are ready to be processed
Chris@16 237 bool dequeue_ready(record_view& rec)
Chris@16 238 {
Chris@16 239 unique_lock< mutex_type > lock(m_mutex);
Chris@16 240 while (!m_interruption_requested)
Chris@16 241 {
Chris@16 242 if (!m_queue.empty())
Chris@16 243 {
Chris@16 244 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 245 enqueued_record const& elem = m_queue.top();
Chris@16 246 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
Chris@16 247 if (difference >= m_ordering_window)
Chris@16 248 {
Chris@16 249 // We got a new element
Chris@16 250 rec = elem.m_record;
Chris@16 251 m_queue.pop();
Chris@16 252 return true;
Chris@16 253 }
Chris@16 254 else
Chris@16 255 {
Chris@16 256 // Wait until the element becomes ready to be processed
Chris@16 257 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
Chris@16 258 }
Chris@16 259 }
Chris@16 260 else
Chris@16 261 {
Chris@16 262 // Wait for an element to come
Chris@16 263 m_cond.wait(lock);
Chris@16 264 }
Chris@16 265 }
Chris@16 266 m_interruption_requested = false;
Chris@16 267
Chris@16 268 return false;
Chris@16 269 }
Chris@16 270
Chris@16 271 //! Wakes a thread possibly blocked in the \c dequeue method
Chris@16 272 void interrupt_dequeue()
Chris@16 273 {
Chris@16 274 lock_guard< mutex_type > lock(m_mutex);
Chris@16 275 m_interruption_requested = true;
Chris@16 276 m_cond.notify_one();
Chris@16 277 }
Chris@16 278
Chris@16 279 private:
Chris@16 280 //! Enqueues a log record
Chris@16 281 void enqueue_unlocked(record_view const& rec)
Chris@16 282 {
Chris@16 283 const bool was_empty = m_queue.empty();
Chris@16 284 m_queue.push(enqueued_record(rec));
Chris@16 285 if (was_empty)
Chris@16 286 m_cond.notify_one();
Chris@16 287 }
Chris@16 288 };
Chris@16 289
Chris@16 290 } // namespace sinks
Chris@16 291
Chris@16 292 BOOST_LOG_CLOSE_NAMESPACE // namespace log
Chris@16 293
Chris@16 294 } // namespace boost
Chris@16 295
Chris@16 296 #include <boost/log/detail/footer.hpp>
Chris@16 297
Chris@16 298 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_