Mercurial > hg > vamp-build-and-test
comparison DEPENDENCIES/generic/include/boost/log/sinks/bounded_ordering_queue.hpp @ 16:2665513ce2d3
Add boost headers
author | Chris Cannam |
---|---|
date | Tue, 05 Aug 2014 11:11:38 +0100 |
parents | |
children | c530137014c0 |
comparison
equal
deleted
inserted
replaced
15:663ca0da4350 | 16:2665513ce2d3 |
---|---|
1 /* | |
2 * Copyright Andrey Semashev 2007 - 2013. | |
3 * Distributed under the Boost Software License, Version 1.0. | |
4 * (See accompanying file LICENSE_1_0.txt or copy at | |
5 * http://www.boost.org/LICENSE_1_0.txt) | |
6 */ | |
7 /*! | |
8 * \file bounded_ordering_queue.hpp | |
9 * \author Andrey Semashev | |
10 * \date 06.01.2012 | |
11 * | |
12 * The header contains implementation of bounded ordering queueing strategy for | |
13 * the asynchronous sink frontend. | |
14 */ | |
15 | |
16 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
17 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
18 | |
19 #include <boost/log/detail/config.hpp> | |
20 | |
21 #ifdef BOOST_HAS_PRAGMA_ONCE | |
22 #pragma once | |
23 #endif | |
24 | |
25 #if defined(BOOST_LOG_NO_THREADS) | |
26 #error Boost.Log: This header content is only supported in multithreaded environment | |
27 #endif | |
28 | |
29 #include <cstddef> | |
30 #include <queue> | |
31 #include <vector> | |
32 #include <boost/cstdint.hpp> | |
33 #include <boost/move/core.hpp> | |
34 #include <boost/move/utility.hpp> | |
35 #include <boost/thread/locks.hpp> | |
36 #include <boost/thread/mutex.hpp> | |
37 #include <boost/thread/condition_variable.hpp> | |
38 #include <boost/thread/thread_time.hpp> | |
39 #include <boost/date_time/posix_time/posix_time_types.hpp> | |
40 #include <boost/log/detail/timestamp.hpp> | |
41 #include <boost/log/keywords/order.hpp> | |
42 #include <boost/log/keywords/ordering_window.hpp> | |
43 #include <boost/log/core/record_view.hpp> | |
44 #include <boost/log/detail/header.hpp> | |
45 | |
46 namespace boost { | |
47 | |
48 BOOST_LOG_OPEN_NAMESPACE | |
49 | |
50 namespace sinks { | |
51 | |
52 /*! | |
53 * \brief Bounded ordering log record queueing strategy | |
54 * | |
55 * The \c bounded_ordering_queue class is intended to be used with | |
56 * the \c asynchronous_sink frontend as a log record queueing strategy. | |
57 * | |
58 * This strategy provides the following properties to the record queueing mechanism: | |
59 * | |
60 * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter. | |
61 * \li Upon reaching the size limit, the queue invokes the overflow handling strategy | |
62 * specified in the \c OverflowStrategyT template parameter to handle the situation. | |
63 * The library provides overflow handling strategies for most common cases: | |
64 * \c drop_on_overflow will silently discard the log record, and \c block_on_overflow | |
65 * will put the enqueueing thread to wait until there is space in the queue. | |
66 * \li The queue has a fixed latency window. This means that each log record put | |
67 * into the queue will normally not be dequeued for a certain period of time. | |
68 * \li The queue performs stable record ordering within the latency window. | |
69 * The ordering predicate can be specified in the \c OrderT template parameter. | |
70 */ | |
71 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT > | |
72 class bounded_ordering_queue : | |
73 private OverflowStrategyT | |
74 { | |
75 private: | |
76 typedef OverflowStrategyT overflow_strategy; | |
77 typedef boost::mutex mutex_type; | |
78 | |
79 //! Log record with enqueueing timestamp | |
80 class enqueued_record | |
81 { | |
82 BOOST_COPYABLE_AND_MOVABLE(enqueued_record) | |
83 | |
84 public: | |
85 //! Ordering predicate | |
86 struct order : | |
87 public OrderT | |
88 { | |
89 typedef typename OrderT::result_type result_type; | |
90 | |
91 order() {} | |
92 order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {} | |
93 order(OrderT const& that) : OrderT(that) {} | |
94 | |
95 result_type operator() (enqueued_record const& left, enqueued_record const& right) const | |
96 { | |
97 // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments | |
98 return OrderT::operator() (right.m_record, left.m_record); | |
99 } | |
100 }; | |
101 | |
102 boost::log::aux::timestamp m_timestamp; | |
103 record_view m_record; | |
104 | |
105 enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record) | |
106 { | |
107 } | |
108 enqueued_record(BOOST_RV_REF(enqueued_record) that) : | |
109 m_timestamp(that.m_timestamp), | |
110 m_record(boost::move(that.m_record)) | |
111 { | |
112 } | |
113 explicit enqueued_record(record_view const& rec) : | |
114 m_timestamp(boost::log::aux::get_timestamp()), | |
115 m_record(rec) | |
116 { | |
117 } | |
118 enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that) | |
119 { | |
120 m_timestamp = that.m_timestamp; | |
121 m_record = that.m_record; | |
122 return *this; | |
123 } | |
124 enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that) | |
125 { | |
126 m_timestamp = that.m_timestamp; | |
127 m_record = boost::move(that.m_record); | |
128 return *this; | |
129 } | |
130 }; | |
131 | |
132 typedef std::priority_queue< | |
133 enqueued_record, | |
134 std::vector< enqueued_record >, | |
135 typename enqueued_record::order | |
136 > queue_type; | |
137 | |
138 private: | |
139 //! Ordering window duration, in milliseconds | |
140 const uint64_t m_ordering_window; | |
141 //! Synchronization primitive | |
142 mutex_type m_mutex; | |
143 //! Condition to block the consuming thread on | |
144 condition_variable m_cond; | |
145 //! Log record queue | |
146 queue_type m_queue; | |
147 //! Interruption flag | |
148 bool m_interruption_requested; | |
149 | |
150 public: | |
151 /*! | |
152 * Returns ordering window size specified during initialization | |
153 */ | |
154 posix_time::time_duration get_ordering_window() const | |
155 { | |
156 return posix_time::milliseconds(m_ordering_window); | |
157 } | |
158 | |
159 /*! | |
160 * Returns default ordering window size. | |
161 * The default window size is specific to the operating system thread scheduling mechanism. | |
162 */ | |
163 static posix_time::time_duration get_default_ordering_window() | |
164 { | |
165 // The main idea behind this parameter is that the ordering window should be large enough | |
166 // to allow the frontend to order records from different threads on an attribute | |
167 // that contains system time. Thus this value should be: | |
168 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. | |
169 // For instance, on Windows it defaults to around 15-16 ms. | |
170 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to | |
171 // switch threads on any known OS. It can be tuned for other platforms as needed. | |
172 return posix_time::milliseconds(30); | |
173 } | |
174 | |
175 protected: | |
176 //! Initializing constructor | |
177 template< typename ArgsT > | |
178 explicit bounded_ordering_queue(ArgsT const& args) : | |
179 m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()), | |
180 m_queue(args[keywords::order]), | |
181 m_interruption_requested(false) | |
182 { | |
183 } | |
184 | |
185 //! Enqueues log record to the queue | |
186 void enqueue(record_view const& rec) | |
187 { | |
188 unique_lock< mutex_type > lock(m_mutex); | |
189 std::size_t size = m_queue.size(); | |
190 for (; size >= MaxQueueSizeV; size = m_queue.size()) | |
191 { | |
192 if (!overflow_strategy::on_overflow(rec, lock)) | |
193 return; | |
194 } | |
195 | |
196 m_queue.push(enqueued_record(rec)); | |
197 if (size == 0) | |
198 m_cond.notify_one(); | |
199 } | |
200 | |
201 //! Attempts to enqueue log record to the queue | |
202 bool try_enqueue(record_view const& rec) | |
203 { | |
204 unique_lock< mutex_type > lock(m_mutex, try_to_lock); | |
205 if (lock.owns_lock()) | |
206 { | |
207 const std::size_t size = m_queue.size(); | |
208 | |
209 // Do not invoke the bounding strategy in case of overflow as it may block | |
210 if (size < MaxQueueSizeV) | |
211 { | |
212 m_queue.push(enqueued_record(rec)); | |
213 if (size == 0) | |
214 m_cond.notify_one(); | |
215 return true; | |
216 } | |
217 } | |
218 | |
219 return false; | |
220 } | |
221 | |
222 //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty | |
223 bool try_dequeue_ready(record_view& rec) | |
224 { | |
225 lock_guard< mutex_type > lock(m_mutex); | |
226 const std::size_t size = m_queue.size(); | |
227 if (size > 0) | |
228 { | |
229 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
230 enqueued_record const& elem = m_queue.top(); | |
231 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) | |
232 { | |
233 // We got a new element | |
234 rec = elem.m_record; | |
235 m_queue.pop(); | |
236 if (size == MaxQueueSizeV) | |
237 overflow_strategy::on_queue_space_available(); | |
238 return true; | |
239 } | |
240 } | |
241 | |
242 return false; | |
243 } | |
244 | |
245 //! Attempts to dequeue log record from the queue, does not block if the queue is empty | |
246 bool try_dequeue(record_view& rec) | |
247 { | |
248 lock_guard< mutex_type > lock(m_mutex); | |
249 const std::size_t size = m_queue.size(); | |
250 if (size > 0) | |
251 { | |
252 enqueued_record const& elem = m_queue.top(); | |
253 rec = elem.m_record; | |
254 m_queue.pop(); | |
255 if (size == MaxQueueSizeV) | |
256 overflow_strategy::on_queue_space_available(); | |
257 return true; | |
258 } | |
259 | |
260 return false; | |
261 } | |
262 | |
263 //! Dequeues log record from the queue, blocks if the queue is empty | |
264 bool dequeue_ready(record_view& rec) | |
265 { | |
266 unique_lock< mutex_type > lock(m_mutex); | |
267 | |
268 while (!m_interruption_requested) | |
269 { | |
270 const std::size_t size = m_queue.size(); | |
271 if (size > 0) | |
272 { | |
273 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
274 enqueued_record const& elem = m_queue.top(); | |
275 const uint64_t difference = (now - elem.m_timestamp).milliseconds(); | |
276 if (difference >= m_ordering_window) | |
277 { | |
278 rec = elem.m_record; | |
279 m_queue.pop(); | |
280 if (size == MaxQueueSizeV) | |
281 overflow_strategy::on_queue_space_available(); | |
282 return true; | |
283 } | |
284 else | |
285 { | |
286 // Wait until the element becomes ready to be processed | |
287 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); | |
288 } | |
289 } | |
290 else | |
291 { | |
292 m_cond.wait(lock); | |
293 } | |
294 } | |
295 m_interruption_requested = false; | |
296 | |
297 return false; | |
298 } | |
299 | |
300 //! Wakes a thread possibly blocked in the \c dequeue method | |
301 void interrupt_dequeue() | |
302 { | |
303 lock_guard< mutex_type > lock(m_mutex); | |
304 m_interruption_requested = true; | |
305 overflow_strategy::interrupt(); | |
306 m_cond.notify_one(); | |
307 } | |
308 }; | |
309 | |
310 } // namespace sinks | |
311 | |
312 BOOST_LOG_CLOSE_NAMESPACE // namespace log | |
313 | |
314 } // namespace boost | |
315 | |
316 #include <boost/log/detail/footer.hpp> | |
317 | |
318 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ |