Mercurial > hg > vamp-build-and-test
comparison DEPENDENCIES/generic/include/boost/log/sinks/unbounded_ordering_queue.hpp @ 16:2665513ce2d3
Add boost headers
author | Chris Cannam |
---|---|
date | Tue, 05 Aug 2014 11:11:38 +0100 |
parents | |
children | c530137014c0 |
comparison
equal
deleted
inserted
replaced
15:663ca0da4350 | 16:2665513ce2d3 |
---|---|
1 /* | |
2 * Copyright Andrey Semashev 2007 - 2013. | |
3 * Distributed under the Boost Software License, Version 1.0. | |
4 * (See accompanying file LICENSE_1_0.txt or copy at | |
5 * http://www.boost.org/LICENSE_1_0.txt) | |
6 */ | |
7 /*! | |
8 * \file unbounded_ordering_queue.hpp | |
9 * \author Andrey Semashev | |
10 * \date 24.07.2011 | |
11 * | |
12 * The header contains implementation of unbounded ordering record queueing strategy for | |
13 * the asynchronous sink frontend. | |
14 */ | |
15 | |
16 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
17 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
18 | |
19 #include <boost/log/detail/config.hpp> | |
20 | |
21 #ifdef BOOST_HAS_PRAGMA_ONCE | |
22 #pragma once | |
23 #endif | |
24 | |
25 #if defined(BOOST_LOG_NO_THREADS) | |
26 #error Boost.Log: This header content is only supported in multithreaded environment | |
27 #endif | |
28 | |
29 #include <queue> | |
30 #include <vector> | |
31 #include <boost/cstdint.hpp> | |
32 #include <boost/move/core.hpp> | |
33 #include <boost/move/utility.hpp> | |
34 #include <boost/thread/locks.hpp> | |
35 #include <boost/thread/mutex.hpp> | |
36 #include <boost/thread/condition_variable.hpp> | |
37 #include <boost/thread/thread_time.hpp> | |
38 #include <boost/date_time/posix_time/posix_time_types.hpp> | |
39 #include <boost/log/detail/timestamp.hpp> | |
40 #include <boost/log/keywords/order.hpp> | |
41 #include <boost/log/keywords/ordering_window.hpp> | |
42 #include <boost/log/core/record_view.hpp> | |
43 #include <boost/log/detail/header.hpp> | |
44 | |
45 namespace boost { | |
46 | |
47 BOOST_LOG_OPEN_NAMESPACE | |
48 | |
49 namespace sinks { | |
50 | |
51 /*! | |
52 * \brief Unbounded ordering log record queueing strategy | |
53 * | |
54 * The \c unbounded_ordering_queue class is intended to be used with | |
55 * the \c asynchronous_sink frontend as a log record queueing strategy. | |
56 * | |
57 * This strategy provides the following properties to the record queueing mechanism: | |
58 * | |
59 * \li The queue has no size limits. | |
60 * \li The queue has a fixed latency window. This means that each log record put | |
61 * into the queue will normally not be dequeued for a certain period of time. | |
62 * \li The queue performs stable record ordering within the latency window. | |
63 * The ordering predicate can be specified in the \c OrderT template parameter. | |
64 * | |
65 * Since this queue has no size limits, it may grow uncontrollably if sink backends | |
66 * dequeue log records not fast enough. When this is an issue, it is recommended to | |
67 * use one of the bounded strategies. | |
68 */ | |
69 template< typename OrderT > | |
70 class unbounded_ordering_queue | |
71 { | |
72 private: | |
73 typedef boost::mutex mutex_type; | |
74 | |
75 //! Log record with enqueueing timestamp | |
76 class enqueued_record | |
77 { | |
78 BOOST_COPYABLE_AND_MOVABLE(enqueued_record) | |
79 | |
80 public: | |
81 //! Ordering predicate | |
82 struct order : | |
83 public OrderT | |
84 { | |
85 typedef typename OrderT::result_type result_type; | |
86 | |
87 order() {} | |
88 order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {} | |
89 order(OrderT const& that) : OrderT(that) {} | |
90 | |
91 result_type operator() (enqueued_record const& left, enqueued_record const& right) const | |
92 { | |
93 // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments | |
94 return OrderT::operator() (right.m_record, left.m_record); | |
95 } | |
96 }; | |
97 | |
98 boost::log::aux::timestamp m_timestamp; | |
99 record_view m_record; | |
100 | |
101 enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record) | |
102 { | |
103 } | |
104 enqueued_record(BOOST_RV_REF(enqueued_record) that) : | |
105 m_timestamp(that.m_timestamp), | |
106 m_record(boost::move(that.m_record)) | |
107 { | |
108 } | |
109 explicit enqueued_record(record_view const& rec) : | |
110 m_timestamp(boost::log::aux::get_timestamp()), | |
111 m_record(rec) | |
112 { | |
113 } | |
114 enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that) | |
115 { | |
116 m_timestamp = that.m_timestamp; | |
117 m_record = that.m_record; | |
118 return *this; | |
119 } | |
120 enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that) | |
121 { | |
122 m_timestamp = that.m_timestamp; | |
123 m_record = boost::move(that.m_record); | |
124 return *this; | |
125 } | |
126 }; | |
127 | |
128 typedef std::priority_queue< | |
129 enqueued_record, | |
130 std::vector< enqueued_record >, | |
131 typename enqueued_record::order | |
132 > queue_type; | |
133 | |
134 private: | |
135 //! Ordering window duration, in milliseconds | |
136 const uint64_t m_ordering_window; | |
137 //! Synchronization mutex | |
138 mutex_type m_mutex; | |
139 //! Condition for blocking | |
140 condition_variable m_cond; | |
141 //! Thread-safe queue | |
142 queue_type m_queue; | |
143 //! Interruption flag | |
144 bool m_interruption_requested; | |
145 | |
146 public: | |
147 /*! | |
148 * Returns ordering window size specified during initialization | |
149 */ | |
150 posix_time::time_duration get_ordering_window() const | |
151 { | |
152 return posix_time::milliseconds(m_ordering_window); | |
153 } | |
154 | |
155 /*! | |
156 * Returns default ordering window size. | |
157 * The default window size is specific to the operating system thread scheduling mechanism. | |
158 */ | |
159 static posix_time::time_duration get_default_ordering_window() | |
160 { | |
161 // The main idea behind this parameter is that the ordering window should be large enough | |
162 // to allow the frontend to order records from different threads on an attribute | |
163 // that contains system time. Thus this value should be: | |
164 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. | |
165 // For instance, on Windows it defaults to around 15-16 ms. | |
166 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to | |
167 // switch threads on any known OS. It can be tuned for other platforms as needed. | |
168 return posix_time::milliseconds(30); | |
169 } | |
170 | |
171 protected: | |
172 //! Initializing constructor | |
173 template< typename ArgsT > | |
174 explicit unbounded_ordering_queue(ArgsT const& args) : | |
175 m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()), | |
176 m_queue(args[keywords::order]), | |
177 m_interruption_requested(false) | |
178 { | |
179 } | |
180 | |
181 //! Enqueues log record to the queue | |
182 void enqueue(record_view const& rec) | |
183 { | |
184 lock_guard< mutex_type > lock(m_mutex); | |
185 enqueue_unlocked(rec); | |
186 } | |
187 | |
188 //! Attempts to enqueue log record to the queue | |
189 bool try_enqueue(record_view const& rec) | |
190 { | |
191 unique_lock< mutex_type > lock(m_mutex, try_to_lock); | |
192 if (lock.owns_lock()) | |
193 { | |
194 enqueue_unlocked(rec); | |
195 return true; | |
196 } | |
197 else | |
198 return false; | |
199 } | |
200 | |
201 //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed | |
202 bool try_dequeue_ready(record_view& rec) | |
203 { | |
204 lock_guard< mutex_type > lock(m_mutex); | |
205 if (!m_queue.empty()) | |
206 { | |
207 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
208 enqueued_record const& elem = m_queue.top(); | |
209 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) | |
210 { | |
211 // We got a new element | |
212 rec = elem.m_record; | |
213 m_queue.pop(); | |
214 return true; | |
215 } | |
216 } | |
217 | |
218 return false; | |
219 } | |
220 | |
221 //! Attempts to dequeue log record from the queue, does not block. | |
222 bool try_dequeue(record_view& rec) | |
223 { | |
224 lock_guard< mutex_type > lock(m_mutex); | |
225 if (!m_queue.empty()) | |
226 { | |
227 enqueued_record const& elem = m_queue.top(); | |
228 rec = elem.m_record; | |
229 m_queue.pop(); | |
230 return true; | |
231 } | |
232 | |
233 return false; | |
234 } | |
235 | |
236 //! Dequeues log record from the queue, blocks if no log records are ready to be processed | |
237 bool dequeue_ready(record_view& rec) | |
238 { | |
239 unique_lock< mutex_type > lock(m_mutex); | |
240 while (!m_interruption_requested) | |
241 { | |
242 if (!m_queue.empty()) | |
243 { | |
244 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
245 enqueued_record const& elem = m_queue.top(); | |
246 const uint64_t difference = (now - elem.m_timestamp).milliseconds(); | |
247 if (difference >= m_ordering_window) | |
248 { | |
249 // We got a new element | |
250 rec = elem.m_record; | |
251 m_queue.pop(); | |
252 return true; | |
253 } | |
254 else | |
255 { | |
256 // Wait until the element becomes ready to be processed | |
257 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); | |
258 } | |
259 } | |
260 else | |
261 { | |
262 // Wait for an element to come | |
263 m_cond.wait(lock); | |
264 } | |
265 } | |
266 m_interruption_requested = false; | |
267 | |
268 return false; | |
269 } | |
270 | |
271 //! Wakes a thread possibly blocked in the \c dequeue method | |
272 void interrupt_dequeue() | |
273 { | |
274 lock_guard< mutex_type > lock(m_mutex); | |
275 m_interruption_requested = true; | |
276 m_cond.notify_one(); | |
277 } | |
278 | |
279 private: | |
280 //! Enqueues a log record | |
281 void enqueue_unlocked(record_view const& rec) | |
282 { | |
283 const bool was_empty = m_queue.empty(); | |
284 m_queue.push(enqueued_record(rec)); | |
285 if (was_empty) | |
286 m_cond.notify_one(); | |
287 } | |
288 }; | |
289 | |
290 } // namespace sinks | |
291 | |
292 BOOST_LOG_CLOSE_NAMESPACE // namespace log | |
293 | |
294 } // namespace boost | |
295 | |
296 #include <boost/log/detail/footer.hpp> | |
297 | |
298 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ |