annotate DEPENDENCIES/generic/include/boost/log/sinks/bounded_ordering_queue.hpp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 /*
Chris@101 2 * Copyright Andrey Semashev 2007 - 2015.
Chris@16 3 * Distributed under the Boost Software License, Version 1.0.
Chris@16 4 * (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 * http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6 */
Chris@16 7 /*!
Chris@16 8 * \file bounded_ordering_queue.hpp
Chris@16 9 * \author Andrey Semashev
Chris@16 10 * \date 06.01.2012
Chris@16 11 *
Chris@16 12 * The header contains implementation of bounded ordering queueing strategy for
Chris@16 13 * the asynchronous sink frontend.
Chris@16 14 */
Chris@16 15
Chris@16 16 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 17 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
Chris@16 18
Chris@16 19 #include <boost/log/detail/config.hpp>
Chris@16 20
Chris@16 21 #ifdef BOOST_HAS_PRAGMA_ONCE
Chris@16 22 #pragma once
Chris@16 23 #endif
Chris@16 24
Chris@16 25 #if defined(BOOST_LOG_NO_THREADS)
Chris@16 26 #error Boost.Log: This header content is only supported in multithreaded environment
Chris@16 27 #endif
Chris@16 28
Chris@16 29 #include <cstddef>
Chris@16 30 #include <queue>
Chris@16 31 #include <vector>
Chris@16 32 #include <boost/cstdint.hpp>
Chris@16 33 #include <boost/thread/locks.hpp>
Chris@16 34 #include <boost/thread/mutex.hpp>
Chris@16 35 #include <boost/thread/condition_variable.hpp>
Chris@16 36 #include <boost/thread/thread_time.hpp>
Chris@16 37 #include <boost/date_time/posix_time/posix_time_types.hpp>
Chris@16 38 #include <boost/log/detail/timestamp.hpp>
Chris@101 39 #include <boost/log/detail/enqueued_record.hpp>
Chris@16 40 #include <boost/log/keywords/order.hpp>
Chris@16 41 #include <boost/log/keywords/ordering_window.hpp>
Chris@16 42 #include <boost/log/core/record_view.hpp>
Chris@16 43 #include <boost/log/detail/header.hpp>
Chris@16 44
Chris@16 45 namespace boost {
Chris@16 46
Chris@16 47 BOOST_LOG_OPEN_NAMESPACE
Chris@16 48
Chris@16 49 namespace sinks {
Chris@16 50
Chris@16 51 /*!
Chris@16 52 * \brief Bounded ordering log record queueing strategy
Chris@16 53 *
Chris@16 54 * The \c bounded_ordering_queue class is intended to be used with
Chris@16 55 * the \c asynchronous_sink frontend as a log record queueing strategy.
Chris@16 56 *
Chris@16 57 * This strategy provides the following properties to the record queueing mechanism:
Chris@16 58 *
Chris@16 59 * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter.
Chris@16 60 * \li Upon reaching the size limit, the queue invokes the overflow handling strategy
Chris@16 61 * specified in the \c OverflowStrategyT template parameter to handle the situation.
Chris@16 62 * The library provides overflow handling strategies for most common cases:
Chris@16 63 * \c drop_on_overflow will silently discard the log record, and \c block_on_overflow
Chris@16 64 * will put the enqueueing thread to wait until there is space in the queue.
Chris@16 65 * \li The queue has a fixed latency window. This means that each log record put
Chris@16 66 * into the queue will normally not be dequeued for a certain period of time.
Chris@16 67 * \li The queue performs stable record ordering within the latency window.
Chris@16 68 * The ordering predicate can be specified in the \c OrderT template parameter.
Chris@16 69 */
Chris@16 70 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
Chris@16 71 class bounded_ordering_queue :
Chris@16 72 private OverflowStrategyT
Chris@16 73 {
Chris@16 74 private:
Chris@16 75 typedef OverflowStrategyT overflow_strategy;
Chris@16 76 typedef boost::mutex mutex_type;
Chris@101 77 typedef sinks::aux::enqueued_record enqueued_record;
Chris@16 78
Chris@16 79 typedef std::priority_queue<
Chris@16 80 enqueued_record,
Chris@16 81 std::vector< enqueued_record >,
Chris@101 82 enqueued_record::order< OrderT >
Chris@16 83 > queue_type;
Chris@16 84
Chris@16 85 private:
Chris@16 86 //! Ordering window duration, in milliseconds
Chris@16 87 const uint64_t m_ordering_window;
Chris@16 88 //! Synchronization primitive
Chris@16 89 mutex_type m_mutex;
Chris@16 90 //! Condition to block the consuming thread on
Chris@16 91 condition_variable m_cond;
Chris@16 92 //! Log record queue
Chris@16 93 queue_type m_queue;
Chris@16 94 //! Interruption flag
Chris@16 95 bool m_interruption_requested;
Chris@16 96
Chris@16 97 public:
Chris@16 98 /*!
Chris@16 99 * Returns ordering window size specified during initialization
Chris@16 100 */
Chris@16 101 posix_time::time_duration get_ordering_window() const
Chris@16 102 {
Chris@16 103 return posix_time::milliseconds(m_ordering_window);
Chris@16 104 }
Chris@16 105
Chris@16 106 /*!
Chris@16 107 * Returns default ordering window size.
Chris@16 108 * The default window size is specific to the operating system thread scheduling mechanism.
Chris@16 109 */
Chris@16 110 static posix_time::time_duration get_default_ordering_window()
Chris@16 111 {
Chris@16 112 // The main idea behind this parameter is that the ordering window should be large enough
Chris@16 113 // to allow the frontend to order records from different threads on an attribute
Chris@16 114 // that contains system time. Thus this value should be:
Chris@16 115 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
Chris@16 116 // For instance, on Windows it defaults to around 15-16 ms.
Chris@16 117 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
Chris@16 118 // switch threads on any known OS. It can be tuned for other platforms as needed.
Chris@16 119 return posix_time::milliseconds(30);
Chris@16 120 }
Chris@16 121
Chris@16 122 protected:
Chris@16 123 //! Initializing constructor
Chris@16 124 template< typename ArgsT >
Chris@16 125 explicit bounded_ordering_queue(ArgsT const& args) :
Chris@16 126 m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
Chris@16 127 m_queue(args[keywords::order]),
Chris@16 128 m_interruption_requested(false)
Chris@16 129 {
Chris@16 130 }
Chris@16 131
Chris@16 132 //! Enqueues log record to the queue
Chris@16 133 void enqueue(record_view const& rec)
Chris@16 134 {
Chris@16 135 unique_lock< mutex_type > lock(m_mutex);
Chris@16 136 std::size_t size = m_queue.size();
Chris@16 137 for (; size >= MaxQueueSizeV; size = m_queue.size())
Chris@16 138 {
Chris@16 139 if (!overflow_strategy::on_overflow(rec, lock))
Chris@16 140 return;
Chris@16 141 }
Chris@16 142
Chris@16 143 m_queue.push(enqueued_record(rec));
Chris@16 144 if (size == 0)
Chris@16 145 m_cond.notify_one();
Chris@16 146 }
Chris@16 147
Chris@16 148 //! Attempts to enqueue log record to the queue
Chris@16 149 bool try_enqueue(record_view const& rec)
Chris@16 150 {
Chris@16 151 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
Chris@16 152 if (lock.owns_lock())
Chris@16 153 {
Chris@16 154 const std::size_t size = m_queue.size();
Chris@16 155
Chris@16 156 // Do not invoke the bounding strategy in case of overflow as it may block
Chris@16 157 if (size < MaxQueueSizeV)
Chris@16 158 {
Chris@16 159 m_queue.push(enqueued_record(rec));
Chris@16 160 if (size == 0)
Chris@16 161 m_cond.notify_one();
Chris@16 162 return true;
Chris@16 163 }
Chris@16 164 }
Chris@16 165
Chris@16 166 return false;
Chris@16 167 }
Chris@16 168
Chris@16 169 //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
Chris@16 170 bool try_dequeue_ready(record_view& rec)
Chris@16 171 {
Chris@16 172 lock_guard< mutex_type > lock(m_mutex);
Chris@16 173 const std::size_t size = m_queue.size();
Chris@16 174 if (size > 0)
Chris@16 175 {
Chris@16 176 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 177 enqueued_record const& elem = m_queue.top();
Chris@16 178 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
Chris@16 179 {
Chris@16 180 // We got a new element
Chris@16 181 rec = elem.m_record;
Chris@16 182 m_queue.pop();
Chris@101 183 overflow_strategy::on_queue_space_available();
Chris@16 184 return true;
Chris@16 185 }
Chris@16 186 }
Chris@16 187
Chris@16 188 return false;
Chris@16 189 }
Chris@16 190
Chris@16 191 //! Attempts to dequeue log record from the queue, does not block if the queue is empty
Chris@16 192 bool try_dequeue(record_view& rec)
Chris@16 193 {
Chris@16 194 lock_guard< mutex_type > lock(m_mutex);
Chris@16 195 const std::size_t size = m_queue.size();
Chris@16 196 if (size > 0)
Chris@16 197 {
Chris@16 198 enqueued_record const& elem = m_queue.top();
Chris@16 199 rec = elem.m_record;
Chris@16 200 m_queue.pop();
Chris@101 201 overflow_strategy::on_queue_space_available();
Chris@16 202 return true;
Chris@16 203 }
Chris@16 204
Chris@16 205 return false;
Chris@16 206 }
Chris@16 207
Chris@16 208 //! Dequeues log record from the queue, blocks if the queue is empty
Chris@16 209 bool dequeue_ready(record_view& rec)
Chris@16 210 {
Chris@16 211 unique_lock< mutex_type > lock(m_mutex);
Chris@16 212
Chris@16 213 while (!m_interruption_requested)
Chris@16 214 {
Chris@16 215 const std::size_t size = m_queue.size();
Chris@16 216 if (size > 0)
Chris@16 217 {
Chris@16 218 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
Chris@16 219 enqueued_record const& elem = m_queue.top();
Chris@16 220 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
Chris@16 221 if (difference >= m_ordering_window)
Chris@16 222 {
Chris@16 223 rec = elem.m_record;
Chris@16 224 m_queue.pop();
Chris@101 225 overflow_strategy::on_queue_space_available();
Chris@16 226 return true;
Chris@16 227 }
Chris@16 228 else
Chris@16 229 {
Chris@16 230 // Wait until the element becomes ready to be processed
Chris@16 231 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
Chris@16 232 }
Chris@16 233 }
Chris@16 234 else
Chris@16 235 {
Chris@16 236 m_cond.wait(lock);
Chris@16 237 }
Chris@16 238 }
Chris@16 239 m_interruption_requested = false;
Chris@16 240
Chris@16 241 return false;
Chris@16 242 }
Chris@16 243
Chris@16 244 //! Wakes a thread possibly blocked in the \c dequeue method
Chris@16 245 void interrupt_dequeue()
Chris@16 246 {
Chris@16 247 lock_guard< mutex_type > lock(m_mutex);
Chris@16 248 m_interruption_requested = true;
Chris@16 249 overflow_strategy::interrupt();
Chris@16 250 m_cond.notify_one();
Chris@16 251 }
Chris@16 252 };
Chris@16 253
Chris@16 254 } // namespace sinks
Chris@16 255
Chris@16 256 BOOST_LOG_CLOSE_NAMESPACE // namespace log
Chris@16 257
Chris@16 258 } // namespace boost
Chris@16 259
Chris@16 260 #include <boost/log/detail/footer.hpp>
Chris@16 261
Chris@16 262 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_