comparison DEPENDENCIES/generic/include/boost/log/sinks/async_frontend.hpp @ 16:2665513ce2d3

Add boost headers
author Chris Cannam
date Tue, 05 Aug 2014 11:11:38 +0100
parents
children c530137014c0
comparison
equal deleted inserted replaced
15:663ca0da4350 16:2665513ce2d3
1 /*
2 * Copyright Andrey Semashev 2007 - 2013.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7 /*!
8 * \file async_frontend.hpp
9 * \author Andrey Semashev
10 * \date 14.07.2009
11 *
12 * The header contains implementation of asynchronous sink frontend.
13 */
14
15 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
16 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
17
18 #include <boost/log/detail/config.hpp>
19
20 #ifdef BOOST_HAS_PRAGMA_ONCE
21 #pragma once
22 #endif
23
24 #if defined(BOOST_LOG_NO_THREADS)
25 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
26 #endif
27
28 #include <boost/bind.hpp>
29 #include <boost/static_assert.hpp>
30 #include <boost/smart_ptr/shared_ptr.hpp>
31 #include <boost/smart_ptr/make_shared_object.hpp>
32 #include <boost/thread/locks.hpp>
33 #include <boost/thread/mutex.hpp>
34 #include <boost/thread/thread.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #include <boost/log/exceptions.hpp>
37 #include <boost/log/detail/locking_ptr.hpp>
38 #include <boost/log/detail/parameter_tools.hpp>
39 #include <boost/log/core/record_view.hpp>
40 #include <boost/log/sinks/basic_sink_frontend.hpp>
41 #include <boost/log/sinks/frontend_requirements.hpp>
42 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
43 #include <boost/log/keywords/start_thread.hpp>
44 #include <boost/log/detail/header.hpp>
45
46 namespace boost {
47
48 BOOST_LOG_OPEN_NAMESPACE
49
50 namespace sinks {
51
52 #ifndef BOOST_LOG_DOXYGEN_PASS
53
54 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, types)\
55 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
56 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
57 base_type(true),\
58 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
59 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\
60 m_StopRequested(false),\
61 m_FlushRequested(false)\
62 {\
63 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
64 start_feeding_thread();\
65 }\
66 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
67 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
68 base_type(true),\
69 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
70 m_pBackend(backend),\
71 m_StopRequested(false),\
72 m_FlushRequested(false)\
73 {\
74 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
75 start_feeding_thread();\
76 }
77
78 #endif // BOOST_LOG_DOXYGEN_PASS
79
80 /*!
81 * \brief Asynchronous logging sink frontend
82 *
83 * The frontend starts a separate thread on construction. All logging records are passed
84 * to the backend in this dedicated thread only.
85 */
86 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
87 class asynchronous_sink :
88 public aux::make_sink_frontend_base< SinkBackendT >::type,
89 private boost::log::aux::locking_ptr_counter_base,
90 public QueueingStrategyT
91 {
92 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
93 typedef QueueingStrategyT queue_base_type;
94
95 private:
96 //! Backend synchronization mutex type
97 typedef boost::mutex backend_mutex_type;
98 //! Frontend synchronization mutex type
99 typedef typename base_type::mutex_type frontend_mutex_type;
100
101 //! A scope guard that implements thread ID management
102 class scoped_thread_id
103 {
104 private:
105 frontend_mutex_type& m_Mutex;
106 condition_variable_any& m_Cond;
107 thread::id& m_ThreadID;
108 bool volatile& m_StopRequested;
109
110 public:
111 //! Initializing constructor
112 scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, bool volatile& sr)
113 : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
114 {
115 lock_guard< frontend_mutex_type > lock(m_Mutex);
116 if (m_ThreadID != thread::id())
117 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
118 m_ThreadID = this_thread::get_id();
119 }
120 //! Initializing constructor
121 scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, bool volatile& sr)
122 : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
123 {
124 unique_lock< frontend_mutex_type > lock(move(l));
125 if (m_ThreadID != thread::id())
126 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
127 m_ThreadID = this_thread::get_id();
128 }
129 //! Destructor
130 ~scoped_thread_id()
131 {
132 try
133 {
134 lock_guard< frontend_mutex_type > lock(m_Mutex);
135 m_StopRequested = false;
136 m_ThreadID = thread::id();
137 m_Cond.notify_all();
138 }
139 catch (...)
140 {
141 }
142 }
143
144 private:
145 scoped_thread_id(scoped_thread_id const&);
146 scoped_thread_id& operator= (scoped_thread_id const&);
147 };
148
149 //! A scope guard that resets a flag on destructor
150 class scoped_flag
151 {
152 private:
153 frontend_mutex_type& m_Mutex;
154 condition_variable_any& m_Cond;
155 volatile bool& m_Flag;
156
157 public:
158 explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, volatile bool& f) :
159 m_Mutex(mut), m_Cond(cond), m_Flag(f)
160 {
161 }
162 ~scoped_flag()
163 {
164 try
165 {
166 lock_guard< frontend_mutex_type > lock(m_Mutex);
167 m_Flag = false;
168 m_Cond.notify_all();
169 }
170 catch (...)
171 {
172 }
173 }
174
175 private:
176 scoped_flag(scoped_flag const&);
177 scoped_flag& operator= (scoped_flag const&);
178 };
179
180 public:
181 //! Sink implementation type
182 typedef SinkBackendT sink_backend_type;
183 //! \cond
184 BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
185 //! \endcond
186
187 #ifndef BOOST_LOG_DOXYGEN_PASS
188
189 //! A pointer type that locks the backend until it's destroyed
190 typedef boost::log::aux::locking_ptr< sink_backend_type > locked_backend_ptr;
191
192 #else // BOOST_LOG_DOXYGEN_PASS
193
194 //! A pointer type that locks the backend until it's destroyed
195 typedef implementation_defined locked_backend_ptr;
196
197 #endif // BOOST_LOG_DOXYGEN_PASS
198
199 private:
200 //! Synchronization mutex
201 backend_mutex_type m_BackendMutex;
202 //! Pointer to the backend
203 const shared_ptr< sink_backend_type > m_pBackend;
204
205 //! Dedicated record feeding thread
206 thread m_DedicatedFeedingThread;
207 //! Feeding thread ID
208 thread::id m_FeedingThreadID;
209 //! Condition variable to implement blocking operations
210 condition_variable_any m_BlockCond;
211
212 //! The flag indicates that the feeding loop has to be stopped
213 volatile bool m_StopRequested; // TODO: make it a real atomic
214 //! The flag indicates that queue flush has been requested
215 volatile bool m_FlushRequested; // TODO: make it a real atomic
216
217 public:
218 /*!
219 * Default constructor. Constructs the sink backend instance.
220 * Requires the backend to be default-constructible.
221 *
222 * \param start_thread If \c true, the frontend creates a thread to feed
223 * log records to the backend. Otherwise no thread is
224 * started and it is assumed that the user will call
225 * either \c run or \c feed_records himself.
226 */
227 asynchronous_sink(bool start_thread = true) :
228 base_type(true),
229 m_pBackend(boost::make_shared< sink_backend_type >()),
230 m_StopRequested(false),
231 m_FlushRequested(false)
232 {
233 if (start_thread)
234 start_feeding_thread();
235 }
236 /*!
237 * Constructor attaches user-constructed backend instance
238 *
239 * \param backend Pointer to the backend instance.
240 * \param start_thread If \c true, the frontend creates a thread to feed
241 * log records to the backend. Otherwise no thread is
242 * started and it is assumed that the user will call
243 * either \c run or \c feed_records himself.
244 *
245 * \pre \a backend is not \c NULL.
246 */
247 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
248 base_type(true),
249 m_pBackend(backend),
250 m_StopRequested(false),
251 m_FlushRequested(false)
252 {
253 if (start_thread)
254 start_feeding_thread();
255 }
256
257 // Constructors that pass arbitrary parameters to the backend constructor
258 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
259
260 /*!
261 * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
262 */
263 ~asynchronous_sink()
264 {
265 boost::this_thread::disable_interruption no_interrupts;
266 stop();
267 }
268
269 /*!
270 * Locking accessor to the attached backend
271 */
272 locked_backend_ptr locked_backend()
273 {
274 return locked_backend_ptr(
275 m_pBackend,
276 static_cast< boost::log::aux::locking_ptr_counter_base& >(*this));
277 }
278
279 /*!
280 * Enqueues the log record to the backend
281 */
282 void consume(record_view const& rec)
283 {
284 if (m_FlushRequested)
285 {
286 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
287 // Wait until flush is done
288 while (m_FlushRequested)
289 m_BlockCond.wait(lock);
290 }
291 queue_base_type::enqueue(rec);
292 }
293
294 /*!
295 * The method attempts to pass logging record to the backend
296 */
297 bool try_consume(record_view const& rec)
298 {
299 if (!m_FlushRequested)
300 {
301 return queue_base_type::try_enqueue(rec);
302 }
303 else
304 return false;
305 }
306
307 /*!
308 * The method starts record feeding loop and effectively blocks until either of this happens:
309 *
310 * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
311 * \li an exception is thrown while processing a log record in the backend, and the exception is
312 * not terminated by the exception handler, if one is installed
313 *
314 * \pre The sink frontend must be constructed without spawning a dedicated thread
315 */
316 void run()
317 {
318 // First check that no other thread is running
319 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
320
321 // Now start the feeding loop
322 while (true)
323 {
324 do_feed_records();
325 if (!m_StopRequested)
326 {
327 // Block until new record is available
328 record_view rec;
329 if (queue_base_type::dequeue_ready(rec))
330 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
331 }
332 else
333 break;
334 }
335 }
336
337 /*!
338 * The method softly interrupts record feeding loop. This method must be called when the \c run
339 * method execution has to be interrupted. Unlike regular thread interruption, calling
340 * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend
341 * will attempt to finish its business with the record in progress and return afterwards.
342 * This method can be called either if the sink was created with a dedicated thread,
343 * or if the feeding loop was initiated by user.
344 *
345 * \note Returning from this method does not guarantee that there are no records left buffered
346 * in the sink frontend. It is possible that log records keep coming during and after this
347 * method is called. At some point of execution of this method log records stop being processed,
348 * and all records that come after this point are put into the queue. These records will be
349 * processed upon further calls to \c run or \c feed_records.
350 */
351 void stop()
352 {
353 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
354 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
355 {
356 try
357 {
358 m_StopRequested = true;
359 queue_base_type::interrupt_dequeue();
360 while (m_StopRequested)
361 m_BlockCond.wait(lock);
362 }
363 catch (...)
364 {
365 m_StopRequested = false;
366 throw;
367 }
368
369 lock.unlock();
370 m_DedicatedFeedingThread.join();
371 }
372 }
373
374 /*!
375 * The method feeds log records that may have been buffered to the backend and returns
376 *
377 * \pre The sink frontend must be constructed without spawning a dedicated thread
378 */
379 void feed_records()
380 {
381 // First check that no other thread is running
382 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
383
384 // Now start the feeding loop
385 do_feed_records();
386 }
387
388 /*!
389 * The method feeds all log records that may have been buffered to the backend and returns.
390 * Unlike \c feed_records, in case of ordering queueing the method also feeds records
391 * that were enqueued during the ordering window, attempting to empty the queue completely.
392 *
393 * \pre The sink frontend must be constructed without spawning a dedicated thread
394 */
395 void flush()
396 {
397 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
398 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
399 {
400 // There is already a thread feeding records, let it do the job
401 m_FlushRequested = true;
402 queue_base_type::interrupt_dequeue();
403 while (!m_StopRequested && m_FlushRequested)
404 m_BlockCond.wait(lock);
405
406 // The condition may have been signalled when the feeding thread was finishing.
407 // In that case records may not have been flushed, and we do the flush ourselves.
408 if (m_FeedingThreadID != thread::id())
409 return;
410 }
411
412 m_FlushRequested = true;
413
414 // Flush records ourselves. The guard releases the lock.
415 scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested);
416
417 do_feed_records();
418 }
419
420 private:
421 #ifndef BOOST_LOG_DOXYGEN_PASS
422 //! The method spawns record feeding thread
423 void start_feeding_thread()
424 {
425 boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread);
426 }
427
428 // locking_ptr_counter_base methods
429 void lock() { m_BackendMutex.lock(); }
430 bool try_lock() { return m_BackendMutex.try_lock(); }
431 void unlock() { m_BackendMutex.unlock(); }
432
433 //! The record feeding loop
434 void do_feed_records()
435 {
436 while (!m_StopRequested)
437 {
438 record_view rec;
439 register bool dequeued = false;
440 if (!m_FlushRequested)
441 dequeued = queue_base_type::try_dequeue_ready(rec);
442 else
443 dequeued = queue_base_type::try_dequeue(rec);
444
445 if (dequeued)
446 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
447 else
448 break;
449 }
450
451 if (m_FlushRequested)
452 {
453 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
454 base_type::flush_backend(m_BackendMutex, *m_pBackend);
455 }
456 }
457 #endif // BOOST_LOG_DOXYGEN_PASS
458 };
459
460 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
461
462 } // namespace sinks
463
464 BOOST_LOG_CLOSE_NAMESPACE // namespace log
465
466 } // namespace boost
467
468 #include <boost/log/detail/footer.hpp>
469
470 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_