Chris@16
|
1 /*
|
Chris@101
|
2 * Copyright Andrey Semashev 2007 - 2015.
|
Chris@16
|
3 * Distributed under the Boost Software License, Version 1.0.
|
Chris@16
|
4 * (See accompanying file LICENSE_1_0.txt or copy at
|
Chris@16
|
5 * http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
6 */
|
Chris@16
|
7 /*!
|
Chris@16
|
8 * \file unbounded_ordering_queue.hpp
|
Chris@16
|
9 * \author Andrey Semashev
|
Chris@16
|
10 * \date 24.07.2011
|
Chris@16
|
11 *
|
Chris@16
|
12 * The header contains implementation of unbounded ordering record queueing strategy for
|
Chris@16
|
13 * the asynchronous sink frontend.
|
Chris@16
|
14 */
|
Chris@16
|
15
|
Chris@16
|
16 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
|
Chris@16
|
17 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
|
Chris@16
|
18
|
Chris@16
|
19 #include <boost/log/detail/config.hpp>
|
Chris@16
|
20
|
Chris@16
|
21 #ifdef BOOST_HAS_PRAGMA_ONCE
|
Chris@16
|
22 #pragma once
|
Chris@16
|
23 #endif
|
Chris@16
|
24
|
Chris@16
|
25 #if defined(BOOST_LOG_NO_THREADS)
|
Chris@16
|
26 #error Boost.Log: This header content is only supported in multithreaded environment
|
Chris@16
|
27 #endif
|
Chris@16
|
28
|
Chris@16
|
29 #include <queue>
|
Chris@16
|
30 #include <vector>
|
Chris@16
|
31 #include <boost/cstdint.hpp>
|
Chris@16
|
32 #include <boost/thread/locks.hpp>
|
Chris@16
|
33 #include <boost/thread/mutex.hpp>
|
Chris@16
|
34 #include <boost/thread/condition_variable.hpp>
|
Chris@16
|
35 #include <boost/thread/thread_time.hpp>
|
Chris@16
|
36 #include <boost/date_time/posix_time/posix_time_types.hpp>
|
Chris@16
|
37 #include <boost/log/detail/timestamp.hpp>
|
Chris@101
|
38 #include <boost/log/detail/enqueued_record.hpp>
|
Chris@16
|
39 #include <boost/log/keywords/order.hpp>
|
Chris@16
|
40 #include <boost/log/keywords/ordering_window.hpp>
|
Chris@16
|
41 #include <boost/log/core/record_view.hpp>
|
Chris@16
|
42 #include <boost/log/detail/header.hpp>
|
Chris@16
|
43
|
Chris@16
|
44 namespace boost {
|
Chris@16
|
45
|
Chris@16
|
46 BOOST_LOG_OPEN_NAMESPACE
|
Chris@16
|
47
|
Chris@16
|
48 namespace sinks {
|
Chris@16
|
49
|
Chris@16
|
50 /*!
|
Chris@16
|
51 * \brief Unbounded ordering log record queueing strategy
|
Chris@16
|
52 *
|
Chris@16
|
53 * The \c unbounded_ordering_queue class is intended to be used with
|
Chris@16
|
54 * the \c asynchronous_sink frontend as a log record queueing strategy.
|
Chris@16
|
55 *
|
Chris@16
|
56 * This strategy provides the following properties to the record queueing mechanism:
|
Chris@16
|
57 *
|
Chris@16
|
58 * \li The queue has no size limits.
|
Chris@16
|
59 * \li The queue has a fixed latency window. This means that each log record put
|
Chris@16
|
60 * into the queue will normally not be dequeued for a certain period of time.
|
Chris@16
|
61 * \li The queue performs stable record ordering within the latency window.
|
Chris@16
|
62 * The ordering predicate can be specified in the \c OrderT template parameter.
|
Chris@16
|
63 *
|
Chris@16
|
64 * Since this queue has no size limits, it may grow uncontrollably if sink backends
|
Chris@16
|
65 * dequeue log records not fast enough. When this is an issue, it is recommended to
|
Chris@16
|
66 * use one of the bounded strategies.
|
Chris@16
|
67 */
|
Chris@16
|
68 template< typename OrderT >
|
Chris@16
|
69 class unbounded_ordering_queue
|
Chris@16
|
70 {
|
Chris@16
|
71 private:
|
Chris@16
|
72 typedef boost::mutex mutex_type;
|
Chris@101
|
73 typedef sinks::aux::enqueued_record enqueued_record;
|
Chris@16
|
74
|
Chris@16
|
75 typedef std::priority_queue<
|
Chris@16
|
76 enqueued_record,
|
Chris@16
|
77 std::vector< enqueued_record >,
|
Chris@101
|
78 enqueued_record::order< OrderT >
|
Chris@16
|
79 > queue_type;
|
Chris@16
|
80
|
Chris@16
|
81 private:
|
Chris@16
|
82 //! Ordering window duration, in milliseconds
|
Chris@16
|
83 const uint64_t m_ordering_window;
|
Chris@16
|
84 //! Synchronization mutex
|
Chris@16
|
85 mutex_type m_mutex;
|
Chris@16
|
86 //! Condition for blocking
|
Chris@16
|
87 condition_variable m_cond;
|
Chris@16
|
88 //! Thread-safe queue
|
Chris@16
|
89 queue_type m_queue;
|
Chris@16
|
90 //! Interruption flag
|
Chris@16
|
91 bool m_interruption_requested;
|
Chris@16
|
92
|
Chris@16
|
93 public:
|
Chris@16
|
94 /*!
|
Chris@16
|
95 * Returns ordering window size specified during initialization
|
Chris@16
|
96 */
|
Chris@16
|
97 posix_time::time_duration get_ordering_window() const
|
Chris@16
|
98 {
|
Chris@16
|
99 return posix_time::milliseconds(m_ordering_window);
|
Chris@16
|
100 }
|
Chris@16
|
101
|
Chris@16
|
102 /*!
|
Chris@16
|
103 * Returns default ordering window size.
|
Chris@16
|
104 * The default window size is specific to the operating system thread scheduling mechanism.
|
Chris@16
|
105 */
|
Chris@16
|
106 static posix_time::time_duration get_default_ordering_window()
|
Chris@16
|
107 {
|
Chris@16
|
108 // The main idea behind this parameter is that the ordering window should be large enough
|
Chris@16
|
109 // to allow the frontend to order records from different threads on an attribute
|
Chris@16
|
110 // that contains system time. Thus this value should be:
|
Chris@16
|
111 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
|
Chris@16
|
112 // For instance, on Windows it defaults to around 15-16 ms.
|
Chris@16
|
113 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
|
Chris@16
|
114 // switch threads on any known OS. It can be tuned for other platforms as needed.
|
Chris@16
|
115 return posix_time::milliseconds(30);
|
Chris@16
|
116 }
|
Chris@16
|
117
|
Chris@16
|
118 protected:
|
Chris@16
|
119 //! Initializing constructor
|
Chris@16
|
120 template< typename ArgsT >
|
Chris@16
|
121 explicit unbounded_ordering_queue(ArgsT const& args) :
|
Chris@16
|
122 m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
|
Chris@16
|
123 m_queue(args[keywords::order]),
|
Chris@16
|
124 m_interruption_requested(false)
|
Chris@16
|
125 {
|
Chris@16
|
126 }
|
Chris@16
|
127
|
Chris@16
|
128 //! Enqueues log record to the queue
|
Chris@16
|
129 void enqueue(record_view const& rec)
|
Chris@16
|
130 {
|
Chris@16
|
131 lock_guard< mutex_type > lock(m_mutex);
|
Chris@16
|
132 enqueue_unlocked(rec);
|
Chris@16
|
133 }
|
Chris@16
|
134
|
Chris@16
|
135 //! Attempts to enqueue log record to the queue
|
Chris@16
|
136 bool try_enqueue(record_view const& rec)
|
Chris@16
|
137 {
|
Chris@16
|
138 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
|
Chris@16
|
139 if (lock.owns_lock())
|
Chris@16
|
140 {
|
Chris@16
|
141 enqueue_unlocked(rec);
|
Chris@16
|
142 return true;
|
Chris@16
|
143 }
|
Chris@16
|
144 else
|
Chris@16
|
145 return false;
|
Chris@16
|
146 }
|
Chris@16
|
147
|
Chris@16
|
148 //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
|
Chris@16
|
149 bool try_dequeue_ready(record_view& rec)
|
Chris@16
|
150 {
|
Chris@16
|
151 lock_guard< mutex_type > lock(m_mutex);
|
Chris@16
|
152 if (!m_queue.empty())
|
Chris@16
|
153 {
|
Chris@16
|
154 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
|
Chris@16
|
155 enqueued_record const& elem = m_queue.top();
|
Chris@16
|
156 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
|
Chris@16
|
157 {
|
Chris@16
|
158 // We got a new element
|
Chris@16
|
159 rec = elem.m_record;
|
Chris@16
|
160 m_queue.pop();
|
Chris@16
|
161 return true;
|
Chris@16
|
162 }
|
Chris@16
|
163 }
|
Chris@16
|
164
|
Chris@16
|
165 return false;
|
Chris@16
|
166 }
|
Chris@16
|
167
|
Chris@16
|
168 //! Attempts to dequeue log record from the queue, does not block.
|
Chris@16
|
169 bool try_dequeue(record_view& rec)
|
Chris@16
|
170 {
|
Chris@16
|
171 lock_guard< mutex_type > lock(m_mutex);
|
Chris@16
|
172 if (!m_queue.empty())
|
Chris@16
|
173 {
|
Chris@16
|
174 enqueued_record const& elem = m_queue.top();
|
Chris@16
|
175 rec = elem.m_record;
|
Chris@16
|
176 m_queue.pop();
|
Chris@16
|
177 return true;
|
Chris@16
|
178 }
|
Chris@16
|
179
|
Chris@16
|
180 return false;
|
Chris@16
|
181 }
|
Chris@16
|
182
|
Chris@16
|
183 //! Dequeues log record from the queue, blocks if no log records are ready to be processed
|
Chris@16
|
184 bool dequeue_ready(record_view& rec)
|
Chris@16
|
185 {
|
Chris@16
|
186 unique_lock< mutex_type > lock(m_mutex);
|
Chris@16
|
187 while (!m_interruption_requested)
|
Chris@16
|
188 {
|
Chris@16
|
189 if (!m_queue.empty())
|
Chris@16
|
190 {
|
Chris@16
|
191 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
|
Chris@16
|
192 enqueued_record const& elem = m_queue.top();
|
Chris@16
|
193 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
|
Chris@16
|
194 if (difference >= m_ordering_window)
|
Chris@16
|
195 {
|
Chris@16
|
196 // We got a new element
|
Chris@16
|
197 rec = elem.m_record;
|
Chris@16
|
198 m_queue.pop();
|
Chris@16
|
199 return true;
|
Chris@16
|
200 }
|
Chris@16
|
201 else
|
Chris@16
|
202 {
|
Chris@16
|
203 // Wait until the element becomes ready to be processed
|
Chris@16
|
204 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
|
Chris@16
|
205 }
|
Chris@16
|
206 }
|
Chris@16
|
207 else
|
Chris@16
|
208 {
|
Chris@16
|
209 // Wait for an element to come
|
Chris@16
|
210 m_cond.wait(lock);
|
Chris@16
|
211 }
|
Chris@16
|
212 }
|
Chris@16
|
213 m_interruption_requested = false;
|
Chris@16
|
214
|
Chris@16
|
215 return false;
|
Chris@16
|
216 }
|
Chris@16
|
217
|
Chris@16
|
218 //! Wakes a thread possibly blocked in the \c dequeue method
|
Chris@16
|
219 void interrupt_dequeue()
|
Chris@16
|
220 {
|
Chris@16
|
221 lock_guard< mutex_type > lock(m_mutex);
|
Chris@16
|
222 m_interruption_requested = true;
|
Chris@16
|
223 m_cond.notify_one();
|
Chris@16
|
224 }
|
Chris@16
|
225
|
Chris@16
|
226 private:
|
Chris@16
|
227 //! Enqueues a log record
|
Chris@16
|
228 void enqueue_unlocked(record_view const& rec)
|
Chris@16
|
229 {
|
Chris@16
|
230 const bool was_empty = m_queue.empty();
|
Chris@16
|
231 m_queue.push(enqueued_record(rec));
|
Chris@16
|
232 if (was_empty)
|
Chris@16
|
233 m_cond.notify_one();
|
Chris@16
|
234 }
|
Chris@16
|
235 };
|
Chris@16
|
236
|
Chris@16
|
237 } // namespace sinks
|
Chris@16
|
238
|
Chris@16
|
239 BOOST_LOG_CLOSE_NAMESPACE // namespace log
|
Chris@16
|
240
|
Chris@16
|
241 } // namespace boost
|
Chris@16
|
242
|
Chris@16
|
243 #include <boost/log/detail/footer.hpp>
|
Chris@16
|
244
|
Chris@16
|
245 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
|