comparison DEPENDENCIES/generic/include/boost/log/sinks/unbounded_ordering_queue.hpp @ 16:2665513ce2d3

Add boost headers
author Chris Cannam
date Tue, 05 Aug 2014 11:11:38 +0100
parents
children c530137014c0
comparison
equal deleted inserted replaced
15:663ca0da4350 16:2665513ce2d3
1 /*
2 * Copyright Andrey Semashev 2007 - 2013.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7 /*!
8 * \file unbounded_ordering_queue.hpp
9 * \author Andrey Semashev
10 * \date 24.07.2011
11 *
12 * The header contains implementation of unbounded ordering record queueing strategy for
13 * the asynchronous sink frontend.
14 */
15
16 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
17 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
18
19 #include <boost/log/detail/config.hpp>
20
21 #ifdef BOOST_HAS_PRAGMA_ONCE
22 #pragma once
23 #endif
24
25 #if defined(BOOST_LOG_NO_THREADS)
26 #error Boost.Log: This header content is only supported in multithreaded environment
27 #endif
28
29 #include <queue>
30 #include <vector>
31 #include <boost/cstdint.hpp>
32 #include <boost/move/core.hpp>
33 #include <boost/move/utility.hpp>
34 #include <boost/thread/locks.hpp>
35 #include <boost/thread/mutex.hpp>
36 #include <boost/thread/condition_variable.hpp>
37 #include <boost/thread/thread_time.hpp>
38 #include <boost/date_time/posix_time/posix_time_types.hpp>
39 #include <boost/log/detail/timestamp.hpp>
40 #include <boost/log/keywords/order.hpp>
41 #include <boost/log/keywords/ordering_window.hpp>
42 #include <boost/log/core/record_view.hpp>
43 #include <boost/log/detail/header.hpp>
44
45 namespace boost {
46
47 BOOST_LOG_OPEN_NAMESPACE
48
49 namespace sinks {
50
51 /*!
52 * \brief Unbounded ordering log record queueing strategy
53 *
54 * The \c unbounded_ordering_queue class is intended to be used with
55 * the \c asynchronous_sink frontend as a log record queueing strategy.
56 *
57 * This strategy provides the following properties to the record queueing mechanism:
58 *
59 * \li The queue has no size limits.
60 * \li The queue has a fixed latency window. This means that each log record put
61 * into the queue will normally not be dequeued for a certain period of time.
62 * \li The queue performs stable record ordering within the latency window.
63 * The ordering predicate can be specified in the \c OrderT template parameter.
64 *
65 * Since this queue has no size limits, it may grow uncontrollably if sink backends
66 * dequeue log records not fast enough. When this is an issue, it is recommended to
67 * use one of the bounded strategies.
68 */
69 template< typename OrderT >
70 class unbounded_ordering_queue
71 {
72 private:
73 typedef boost::mutex mutex_type;
74
75 //! Log record with enqueueing timestamp
76 class enqueued_record
77 {
78 BOOST_COPYABLE_AND_MOVABLE(enqueued_record)
79
80 public:
81 //! Ordering predicate
82 struct order :
83 public OrderT
84 {
85 typedef typename OrderT::result_type result_type;
86
87 order() {}
88 order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {}
89 order(OrderT const& that) : OrderT(that) {}
90
91 result_type operator() (enqueued_record const& left, enqueued_record const& right) const
92 {
93 // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments
94 return OrderT::operator() (right.m_record, left.m_record);
95 }
96 };
97
98 boost::log::aux::timestamp m_timestamp;
99 record_view m_record;
100
101 enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record)
102 {
103 }
104 enqueued_record(BOOST_RV_REF(enqueued_record) that) :
105 m_timestamp(that.m_timestamp),
106 m_record(boost::move(that.m_record))
107 {
108 }
109 explicit enqueued_record(record_view const& rec) :
110 m_timestamp(boost::log::aux::get_timestamp()),
111 m_record(rec)
112 {
113 }
114 enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that)
115 {
116 m_timestamp = that.m_timestamp;
117 m_record = that.m_record;
118 return *this;
119 }
120 enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that)
121 {
122 m_timestamp = that.m_timestamp;
123 m_record = boost::move(that.m_record);
124 return *this;
125 }
126 };
127
128 typedef std::priority_queue<
129 enqueued_record,
130 std::vector< enqueued_record >,
131 typename enqueued_record::order
132 > queue_type;
133
134 private:
135 //! Ordering window duration, in milliseconds
136 const uint64_t m_ordering_window;
137 //! Synchronization mutex
138 mutex_type m_mutex;
139 //! Condition for blocking
140 condition_variable m_cond;
141 //! Thread-safe queue
142 queue_type m_queue;
143 //! Interruption flag
144 bool m_interruption_requested;
145
146 public:
147 /*!
148 * Returns ordering window size specified during initialization
149 */
150 posix_time::time_duration get_ordering_window() const
151 {
152 return posix_time::milliseconds(m_ordering_window);
153 }
154
155 /*!
156 * Returns default ordering window size.
157 * The default window size is specific to the operating system thread scheduling mechanism.
158 */
159 static posix_time::time_duration get_default_ordering_window()
160 {
161 // The main idea behind this parameter is that the ordering window should be large enough
162 // to allow the frontend to order records from different threads on an attribute
163 // that contains system time. Thus this value should be:
164 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
165 // For instance, on Windows it defaults to around 15-16 ms.
166 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
167 // switch threads on any known OS. It can be tuned for other platforms as needed.
168 return posix_time::milliseconds(30);
169 }
170
171 protected:
172 //! Initializing constructor
173 template< typename ArgsT >
174 explicit unbounded_ordering_queue(ArgsT const& args) :
175 m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
176 m_queue(args[keywords::order]),
177 m_interruption_requested(false)
178 {
179 }
180
181 //! Enqueues log record to the queue
182 void enqueue(record_view const& rec)
183 {
184 lock_guard< mutex_type > lock(m_mutex);
185 enqueue_unlocked(rec);
186 }
187
188 //! Attempts to enqueue log record to the queue
189 bool try_enqueue(record_view const& rec)
190 {
191 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
192 if (lock.owns_lock())
193 {
194 enqueue_unlocked(rec);
195 return true;
196 }
197 else
198 return false;
199 }
200
201 //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
202 bool try_dequeue_ready(record_view& rec)
203 {
204 lock_guard< mutex_type > lock(m_mutex);
205 if (!m_queue.empty())
206 {
207 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
208 enqueued_record const& elem = m_queue.top();
209 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
210 {
211 // We got a new element
212 rec = elem.m_record;
213 m_queue.pop();
214 return true;
215 }
216 }
217
218 return false;
219 }
220
221 //! Attempts to dequeue log record from the queue, does not block.
222 bool try_dequeue(record_view& rec)
223 {
224 lock_guard< mutex_type > lock(m_mutex);
225 if (!m_queue.empty())
226 {
227 enqueued_record const& elem = m_queue.top();
228 rec = elem.m_record;
229 m_queue.pop();
230 return true;
231 }
232
233 return false;
234 }
235
236 //! Dequeues log record from the queue, blocks if no log records are ready to be processed
237 bool dequeue_ready(record_view& rec)
238 {
239 unique_lock< mutex_type > lock(m_mutex);
240 while (!m_interruption_requested)
241 {
242 if (!m_queue.empty())
243 {
244 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
245 enqueued_record const& elem = m_queue.top();
246 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
247 if (difference >= m_ordering_window)
248 {
249 // We got a new element
250 rec = elem.m_record;
251 m_queue.pop();
252 return true;
253 }
254 else
255 {
256 // Wait until the element becomes ready to be processed
257 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
258 }
259 }
260 else
261 {
262 // Wait for an element to come
263 m_cond.wait(lock);
264 }
265 }
266 m_interruption_requested = false;
267
268 return false;
269 }
270
271 //! Wakes a thread possibly blocked in the \c dequeue method
272 void interrupt_dequeue()
273 {
274 lock_guard< mutex_type > lock(m_mutex);
275 m_interruption_requested = true;
276 m_cond.notify_one();
277 }
278
279 private:
280 //! Enqueues a log record
281 void enqueue_unlocked(record_view const& rec)
282 {
283 const bool was_empty = m_queue.empty();
284 m_queue.push(enqueued_record(rec));
285 if (was_empty)
286 m_cond.notify_one();
287 }
288 };
289
290 } // namespace sinks
291
292 BOOST_LOG_CLOSE_NAMESPACE // namespace log
293
294 } // namespace boost
295
296 #include <boost/log/detail/footer.hpp>
297
298 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_