annotate DEPENDENCIES/generic/include/boost/log/sinks/bounded_ordering_queue.hpp @ 49:666a1c41ce51

Package up binaries
author Chris Cannam
date Thu, 07 Aug 2014 19:17:03 +0100
parents 2665513ce2d3
children c530137014c0
rev   line source
Chris@16 1 /*
Chris@16 2 * Copyright Andrey Semashev 2007 - 2013.
Chris@16 3 * Distributed under the Boost Software License, Version 1.0.
Chris@16 4 * (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 * http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6 */
Chris@16 7 /*!
Chris@16 8 * \file bounded_ordering_queue.hpp
Chris@16 9 * \author Andrey Semashev
Chris@16 10 * \date 06.01.2012
Chris@16 11 *
Chris@16 12 * The header contains implementation of bounded ordering queueing strategy for
Chris@16 13 * the asynchronous sink frontend.
Chris@16 14 */
Chris@16 15
Chris@16 16 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 17 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 18
Chris@16 19 #include <boost/log/detail/config.hpp>
Chris@16 20
Chris@16 21 #ifdef BOOST_HAS_PRAGMA_ONCE
Chris@16 22 #pragma once
Chris@16 23 #endif
Chris@16 24
Chris@16 25 #if defined(BOOST_LOG_NO_THREADS)
Chris@16 26 #error Boost.Log: This header content is only supported in multithreaded environment
Chris@16 27 #endif
Chris@16 28
Chris@16 29 #include <cstddef>
Chris@16 30 #include <queue>
Chris@16 31 #include <vector>
Chris@16 32 #include <boost/cstdint.hpp>
Chris@16 33 #include <boost/move/core.hpp>
Chris@16 34 #include <boost/move/utility.hpp>
Chris@16 35 #include <boost/thread/locks.hpp>
Chris@16 36 #include <boost/thread/mutex.hpp>
Chris@16 37 #include <boost/thread/condition_variable.hpp>
Chris@16 38 #include <boost/thread/thread_time.hpp>
Chris@16 39 #include <boost/date_time/posix_time/posix_time_types.hpp>
Chris@16 40 #include <boost/log/detail/timestamp.hpp>
Chris@16 41 #include <boost/log/keywords/order.hpp>
Chris@16 42 #include <boost/log/keywords/ordering_window.hpp>
Chris@16 43 #include <boost/log/core/record_view.hpp>
Chris@16 44 #include <boost/log/detail/header.hpp>
Chris@16 45
Chris@16 46 namespace boost {
Chris@16 47
Chris@16 48 BOOST_LOG_OPEN_NAMESPACE
Chris@16 49
Chris@16 50 namespace sinks {
Chris@16 51
Chris@16 52 /*!
Chris@16 53 * \brief Bounded ordering log record queueing strategy
Chris@16 54 *
Chris@16 55 * The \c bounded_ordering_queue class is intended to be used with
Chris@16 56 * the \c asynchronous_sink frontend as a log record queueing strategy.
Chris@16 57 *
Chris@16 58 * This strategy provides the following properties to the record queueing mechanism:
Chris@16 59 *
Chris@16 60 * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter.
Chris@16 61 * \li Upon reaching the size limit, the queue invokes the overflow handling strategy
Chris@16 62 * specified in the \c OverflowStrategyT template parameter to handle the situation.
Chris@16 63 * The library provides overflow handling strategies for most common cases:
Chris@16 64 * \c drop_on_overflow will silently discard the log record, and \c block_on_overflow
Chris@16 65 * will put the enqueueing thread to wait until there is space in the queue.
Chris@16 66 * \li The queue has a fixed latency window. This means that each log record put
Chris@16 67 * into the queue will normally not be dequeued for a certain period of time.
Chris@16 68 * \li The queue performs stable record ordering within the latency window.
Chris@16 69 * The ordering predicate can be specified in the \c OrderT template parameter.
Chris@16 70 */
Chris@16 71 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
Chris@16 72 class bounded_ordering_queue :
Chris@16 73 private OverflowStrategyT
Chris@16 74 {
Chris@16 75 private:
Chris@16 76 typedef OverflowStrategyT overflow_strategy;
Chris@16 77 typedef boost::mutex mutex_type;
Chris@16 78
Chris@16 79 //! Log record with enqueueing timestamp
Chris@16 80 class enqueued_record
Chris@16 81 {
Chris@16 82 BOOST_COPYABLE_AND_MOVABLE(enqueued_record)
Chris@16 83
Chris@16 84 public:
Chris@16 85 //! Ordering predicate
Chris@16 86 struct order :
Chris@16 87 public OrderT
Chris@16 88 {
Chris@16 89 typedef typename OrderT::result_type result_type;
Chris@16 90
Chris@16 91 order() {}
Chris@16 92 order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {}
Chris@16 93 order(OrderT const& that) : OrderT(that) {}
Chris@16 94
Chris@16 95 result_type operator() (enqueued_record const& left, enqueued_record const& right) const
Chris@16 96 {
Chris@16 97 // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments
Chris@16 98 return OrderT::operator() (right.m_record, left.m_record);
Chris@16 99 }
Chris@16 100 };
Chris@16 101
Chris@16 102 boost::log::aux::timestamp m_timestamp;
Chris@16 103 record_view m_record;
Chris@16 104
Chris@16 105 enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record)
Chris@16 106 {
Chris@16 107 }
Chris@16 108 enqueued_record(BOOST_RV_REF(enqueued_record) that) :
Chris@16 109 m_timestamp(that.m_timestamp),
Chris@16 110 m_record(boost::move(that.m_record))
Chris@16 111 {
Chris@16 112 }
Chris@16 113 explicit enqueued_record(record_view const& rec) :
Chris@16 114 m_timestamp(boost::log::aux::get_timestamp()),
Chris@16 115 m_record(rec)
Chris@16 116 {
Chris@16 117 }
Chris@16 118 enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that)
Chris@16 119 {
Chris@16 120 m_timestamp = that.m_timestamp;
Chris@16 121 m_record = that.m_record;
Chris@16 122 return *this;
Chris@16 123 }
Chris@16 124 enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that)
Chris@16 125 {
Chris@16 126 m_timestamp = that.m_timestamp;
Chris@16 127 m_record = boost::move(that.m_record);
Chris@16 128 return *this;
Chris@16 129 }
Chris@16 130 };
Chris@16 131
Chris@16 132 typedef std::priority_queue<
Chris@16 133 enqueued_record,
Chris@16 134 std::vector< enqueued_record >,
Chris@16 135 typename enqueued_record::order
Chris@16 136 > queue_type;
Chris@16 137
Chris@16 138 private:
Chris@16 139 //! Ordering window duration, in milliseconds
Chris@16 140 const uint64_t m_ordering_window;
Chris@16 141 //! Synchronization primitive
Chris@16 142 mutex_type m_mutex;
Chris@16 143 //! Condition to block the consuming thread on
Chris@16 144 condition_variable m_cond;
Chris@16 145 //! Log record queue
Chris@16 146 queue_type m_queue;
Chris@16 147 //! Interruption flag
Chris@16 148 bool m_interruption_requested;
Chris@16 149
Chris@16 150 public:
Chris@16 151 /*!
Chris@16 152 * Returns ordering window size specified during initialization
Chris@16 153 */
Chris@16 154 posix_time::time_duration get_ordering_window() const
Chris@16 155 {
Chris@16 156 return posix_time::milliseconds(m_ordering_window);
Chris@16 157 }
Chris@16 158
Chris@16 159 /*!
Chris@16 160 * Returns default ordering window size.
Chris@16 161 * The default window size is specific to the operating system thread scheduling mechanism.
Chris@16 162 */
Chris@16 163 static posix_time::time_duration get_default_ordering_window()
Chris@16 164 {
Chris@16 165 // The main idea behind this parameter is that the ordering window should be large enough
Chris@16 166 // to allow the frontend to order records from different threads on an attribute
Chris@16 167 // that contains system time. Thus this value should be:
Chris@16 168 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
Chris@16 169 // For instance, on Windows it defaults to around 15-16 ms.
Chris@16 170 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
Chris@16 171 // switch threads on any known OS. It can be tuned for other platforms as needed.
Chris@16 172 return posix_time::milliseconds(30);
Chris@16 173 }
Chris@16 174
Chris@16 175 protected:
Chris@16 176 //! Initializing constructor
Chris@16 177 template< typename ArgsT >
Chris@16 178 explicit bounded_ordering_queue(ArgsT const& args) :
Chris@16 179 m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
Chris@16 180 m_queue(args[keywords::order]),
Chris@16 181 m_interruption_requested(false)
Chris@16 182 {
Chris@16 183 }
Chris@16 184
Chris@16 185 //! Enqueues log record to the queue
Chris@16 186 void enqueue(record_view const& rec)
Chris@16 187 {
Chris@16 188 unique_lock< mutex_type > lock(m_mutex);
Chris@16 189 std::size_t size = m_queue.size();
Chris@16 190 for (; size >= MaxQueueSizeV; size = m_queue.size())
Chris@16 191 {
Chris@16 192 if (!overflow_strategy::on_overflow(rec, lock))
Chris@16 193 return;
Chris@16 194 }
Chris@16 195
Chris@16 196 m_queue.push(enqueued_record(rec));
Chris@16 197 if (size == 0)
Chris@16 198 m_cond.notify_one();
Chris@16 199 }
Chris@16 200
Chris@16 201 //! Attempts to enqueue log record to the queue
Chris@16 202 bool try_enqueue(record_view const& rec)
Chris@16 203 {
Chris@16 204 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
Chris@16 205 if (lock.owns_lock())
Chris@16 206 {
Chris@16 207 const std::size_t size = m_queue.size();
Chris@16 208
Chris@16 209 // Do not invoke the bounding strategy in case of overflow as it may block
Chris@16 210 if (size < MaxQueueSizeV)
Chris@16 211 {
Chris@16 212 m_queue.push(enqueued_record(rec));
Chris@16 213 if (size == 0)
Chris@16 214 m_cond.notify_one();
Chris@16 215 return true;
Chris@16 216 }
Chris@16 217 }
Chris@16 218
Chris@16 219 return false;
Chris@16 220 }
Chris@16 221
Chris@16 222 //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
Chris@16 223 bool try_dequeue_ready(record_view& rec)
Chris@16 224 {
Chris@16 225 lock_guard< mutex_type > lock(m_mutex);
Chris@16 226 const std::size_t size = m_queue.size();
Chris@16 227 if (size > 0)
Chris@16 228 {
Chris@16 229 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 230 enqueued_record const& elem = m_queue.top();
Chris@16 231 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
Chris@16 232 {
Chris@16 233 // We got a new element
Chris@16 234 rec = elem.m_record;
Chris@16 235 m_queue.pop();
Chris@16 236 if (size == MaxQueueSizeV)
Chris@16 237 overflow_strategy::on_queue_space_available();
Chris@16 238 return true;
Chris@16 239 }
Chris@16 240 }
Chris@16 241
Chris@16 242 return false;
Chris@16 243 }
Chris@16 244
Chris@16 245 //! Attempts to dequeue log record from the queue, does not block if the queue is empty
Chris@16 246 bool try_dequeue(record_view& rec)
Chris@16 247 {
Chris@16 248 lock_guard< mutex_type > lock(m_mutex);
Chris@16 249 const std::size_t size = m_queue.size();
Chris@16 250 if (size > 0)
Chris@16 251 {
Chris@16 252 enqueued_record const& elem = m_queue.top();
Chris@16 253 rec = elem.m_record;
Chris@16 254 m_queue.pop();
Chris@16 255 if (size == MaxQueueSizeV)
Chris@16 256 overflow_strategy::on_queue_space_available();
Chris@16 257 return true;
Chris@16 258 }
Chris@16 259
Chris@16 260 return false;
Chris@16 261 }
Chris@16 262
Chris@16 263 //! Dequeues log record from the queue, blocks if the queue is empty
Chris@16 264 bool dequeue_ready(record_view& rec)
Chris@16 265 {
Chris@16 266 unique_lock< mutex_type > lock(m_mutex);
Chris@16 267
Chris@16 268 while (!m_interruption_requested)
Chris@16 269 {
Chris@16 270 const std::size_t size = m_queue.size();
Chris@16 271 if (size > 0)
Chris@16 272 {
Chris@16 273 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 274 enqueued_record const& elem = m_queue.top();
Chris@16 275 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
Chris@16 276 if (difference >= m_ordering_window)
Chris@16 277 {
Chris@16 278 rec = elem.m_record;
Chris@16 279 m_queue.pop();
Chris@16 280 if (size == MaxQueueSizeV)
Chris@16 281 overflow_strategy::on_queue_space_available();
Chris@16 282 return true;
Chris@16 283 }
Chris@16 284 else
Chris@16 285 {
Chris@16 286 // Wait until the element becomes ready to be processed
Chris@16 287 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
Chris@16 288 }
Chris@16 289 }
Chris@16 290 else
Chris@16 291 {
Chris@16 292 m_cond.wait(lock);
Chris@16 293 }
Chris@16 294 }
Chris@16 295 m_interruption_requested = false;
Chris@16 296
Chris@16 297 return false;
Chris@16 298 }
Chris@16 299
Chris@16 300 //! Wakes a thread possibly blocked in the \c dequeue method
Chris@16 301 void interrupt_dequeue()
Chris@16 302 {
Chris@16 303 lock_guard< mutex_type > lock(m_mutex);
Chris@16 304 m_interruption_requested = true;
Chris@16 305 overflow_strategy::interrupt();
Chris@16 306 m_cond.notify_one();
Chris@16 307 }
Chris@16 308 };
Chris@16 309
Chris@16 310 } // namespace sinks
Chris@16 311
Chris@16 312 BOOST_LOG_CLOSE_NAMESPACE // namespace log
Chris@16 313
Chris@16 314 } // namespace boost
Chris@16 315
Chris@16 316 #include <boost/log/detail/footer.hpp>
Chris@16 317
Chris@16 318 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_