comparison DEPENDENCIES/generic/include/boost/log/sinks/bounded_ordering_queue.hpp @ 16:2665513ce2d3

Add boost headers
author Chris Cannam
date Tue, 05 Aug 2014 11:11:38 +0100
parents
children c530137014c0
comparison
equal deleted inserted replaced
15:663ca0da4350 16:2665513ce2d3
1 /*
2 * Copyright Andrey Semashev 2007 - 2013.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7 /*!
8 * \file bounded_ordering_queue.hpp
9 * \author Andrey Semashev
10 * \date 06.01.2012
11 *
12 * The header contains implementation of bounded ordering queueing strategy for
13 * the asynchronous sink frontend.
14 */
15
16 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
17 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
18
19 #include <boost/log/detail/config.hpp>
20
21 #ifdef BOOST_HAS_PRAGMA_ONCE
22 #pragma once
23 #endif
24
25 #if defined(BOOST_LOG_NO_THREADS)
26 #error Boost.Log: This header content is only supported in multithreaded environment
27 #endif
28
29 #include <cstddef>
30 #include <queue>
31 #include <vector>
32 #include <boost/cstdint.hpp>
33 #include <boost/move/core.hpp>
34 #include <boost/move/utility.hpp>
35 #include <boost/thread/locks.hpp>
36 #include <boost/thread/mutex.hpp>
37 #include <boost/thread/condition_variable.hpp>
38 #include <boost/thread/thread_time.hpp>
39 #include <boost/date_time/posix_time/posix_time_types.hpp>
40 #include <boost/log/detail/timestamp.hpp>
41 #include <boost/log/keywords/order.hpp>
42 #include <boost/log/keywords/ordering_window.hpp>
43 #include <boost/log/core/record_view.hpp>
44 #include <boost/log/detail/header.hpp>
45
46 namespace boost {
47
48 BOOST_LOG_OPEN_NAMESPACE
49
50 namespace sinks {
51
52 /*!
53 * \brief Bounded ordering log record queueing strategy
54 *
55 * The \c bounded_ordering_queue class is intended to be used with
56 * the \c asynchronous_sink frontend as a log record queueing strategy.
57 *
58 * This strategy provides the following properties to the record queueing mechanism:
59 *
60 * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter.
61 * \li Upon reaching the size limit, the queue invokes the overflow handling strategy
62 * specified in the \c OverflowStrategyT template parameter to handle the situation.
63 * The library provides overflow handling strategies for most common cases:
64 * \c drop_on_overflow will silently discard the log record, and \c block_on_overflow
65 * will put the enqueueing thread to wait until there is space in the queue.
66 * \li The queue has a fixed latency window. This means that each log record put
67 * into the queue will normally not be dequeued for a certain period of time.
68 * \li The queue performs stable record ordering within the latency window.
69 * The ordering predicate can be specified in the \c OrderT template parameter.
70 */
71 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
72 class bounded_ordering_queue :
73 private OverflowStrategyT
74 {
75 private:
76 typedef OverflowStrategyT overflow_strategy;
77 typedef boost::mutex mutex_type;
78
79 //! Log record with enqueueing timestamp
80 class enqueued_record
81 {
82 BOOST_COPYABLE_AND_MOVABLE(enqueued_record)
83
84 public:
85 //! Ordering predicate
86 struct order :
87 public OrderT
88 {
89 typedef typename OrderT::result_type result_type;
90
91 order() {}
92 order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {}
93 order(OrderT const& that) : OrderT(that) {}
94
95 result_type operator() (enqueued_record const& left, enqueued_record const& right) const
96 {
97 // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments
98 return OrderT::operator() (right.m_record, left.m_record);
99 }
100 };
101
102 boost::log::aux::timestamp m_timestamp;
103 record_view m_record;
104
105 enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record)
106 {
107 }
108 enqueued_record(BOOST_RV_REF(enqueued_record) that) :
109 m_timestamp(that.m_timestamp),
110 m_record(boost::move(that.m_record))
111 {
112 }
113 explicit enqueued_record(record_view const& rec) :
114 m_timestamp(boost::log::aux::get_timestamp()),
115 m_record(rec)
116 {
117 }
118 enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that)
119 {
120 m_timestamp = that.m_timestamp;
121 m_record = that.m_record;
122 return *this;
123 }
124 enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that)
125 {
126 m_timestamp = that.m_timestamp;
127 m_record = boost::move(that.m_record);
128 return *this;
129 }
130 };
131
132 typedef std::priority_queue<
133 enqueued_record,
134 std::vector< enqueued_record >,
135 typename enqueued_record::order
136 > queue_type;
137
138 private:
139 //! Ordering window duration, in milliseconds
140 const uint64_t m_ordering_window;
141 //! Synchronization primitive
142 mutex_type m_mutex;
143 //! Condition to block the consuming thread on
144 condition_variable m_cond;
145 //! Log record queue
146 queue_type m_queue;
147 //! Interruption flag
148 bool m_interruption_requested;
149
150 public:
151 /*!
152 * Returns ordering window size specified during initialization
153 */
154 posix_time::time_duration get_ordering_window() const
155 {
156 return posix_time::milliseconds(m_ordering_window);
157 }
158
159 /*!
160 * Returns default ordering window size.
161 * The default window size is specific to the operating system thread scheduling mechanism.
162 */
163 static posix_time::time_duration get_default_ordering_window()
164 {
165 // The main idea behind this parameter is that the ordering window should be large enough
166 // to allow the frontend to order records from different threads on an attribute
167 // that contains system time. Thus this value should be:
168 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
169 // For instance, on Windows it defaults to around 15-16 ms.
170 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
171 // switch threads on any known OS. It can be tuned for other platforms as needed.
172 return posix_time::milliseconds(30);
173 }
174
175 protected:
176 //! Initializing constructor
177 template< typename ArgsT >
178 explicit bounded_ordering_queue(ArgsT const& args) :
179 m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
180 m_queue(args[keywords::order]),
181 m_interruption_requested(false)
182 {
183 }
184
185 //! Enqueues log record to the queue
186 void enqueue(record_view const& rec)
187 {
188 unique_lock< mutex_type > lock(m_mutex);
189 std::size_t size = m_queue.size();
190 for (; size >= MaxQueueSizeV; size = m_queue.size())
191 {
192 if (!overflow_strategy::on_overflow(rec, lock))
193 return;
194 }
195
196 m_queue.push(enqueued_record(rec));
197 if (size == 0)
198 m_cond.notify_one();
199 }
200
201 //! Attempts to enqueue log record to the queue
202 bool try_enqueue(record_view const& rec)
203 {
204 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
205 if (lock.owns_lock())
206 {
207 const std::size_t size = m_queue.size();
208
209 // Do not invoke the bounding strategy in case of overflow as it may block
210 if (size < MaxQueueSizeV)
211 {
212 m_queue.push(enqueued_record(rec));
213 if (size == 0)
214 m_cond.notify_one();
215 return true;
216 }
217 }
218
219 return false;
220 }
221
222 //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
223 bool try_dequeue_ready(record_view& rec)
224 {
225 lock_guard< mutex_type > lock(m_mutex);
226 const std::size_t size = m_queue.size();
227 if (size > 0)
228 {
229 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
230 enqueued_record const& elem = m_queue.top();
231 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
232 {
233 // We got a new element
234 rec = elem.m_record;
235 m_queue.pop();
236 if (size == MaxQueueSizeV)
237 overflow_strategy::on_queue_space_available();
238 return true;
239 }
240 }
241
242 return false;
243 }
244
245 //! Attempts to dequeue log record from the queue, does not block if the queue is empty
246 bool try_dequeue(record_view& rec)
247 {
248 lock_guard< mutex_type > lock(m_mutex);
249 const std::size_t size = m_queue.size();
250 if (size > 0)
251 {
252 enqueued_record const& elem = m_queue.top();
253 rec = elem.m_record;
254 m_queue.pop();
255 if (size == MaxQueueSizeV)
256 overflow_strategy::on_queue_space_available();
257 return true;
258 }
259
260 return false;
261 }
262
263 //! Dequeues log record from the queue, blocks if the queue is empty
264 bool dequeue_ready(record_view& rec)
265 {
266 unique_lock< mutex_type > lock(m_mutex);
267
268 while (!m_interruption_requested)
269 {
270 const std::size_t size = m_queue.size();
271 if (size > 0)
272 {
273 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
274 enqueued_record const& elem = m_queue.top();
275 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
276 if (difference >= m_ordering_window)
277 {
278 rec = elem.m_record;
279 m_queue.pop();
280 if (size == MaxQueueSizeV)
281 overflow_strategy::on_queue_space_available();
282 return true;
283 }
284 else
285 {
286 // Wait until the element becomes ready to be processed
287 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
288 }
289 }
290 else
291 {
292 m_cond.wait(lock);
293 }
294 }
295 m_interruption_requested = false;
296
297 return false;
298 }
299
300 //! Wakes a thread possibly blocked in the \c dequeue method
301 void interrupt_dequeue()
302 {
303 lock_guard< mutex_type > lock(m_mutex);
304 m_interruption_requested = true;
305 overflow_strategy::interrupt();
306 m_cond.notify_one();
307 }
308 };
309
310 } // namespace sinks
311
312 BOOST_LOG_CLOSE_NAMESPACE // namespace log
313
314 } // namespace boost
315
316 #include <boost/log/detail/footer.hpp>
317
318 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_