annotate DEPENDENCIES/generic/include/boost/log/sinks/async_frontend.hpp @ 69:300f9fa4b454

Restore erroneous subrepo
author Chris Cannam
date Thu, 16 Oct 2014 14:41:48 +0100
parents 2665513ce2d3
children c530137014c0
rev   line source
Chris@16 1 /*
Chris@16 2 * Copyright Andrey Semashev 2007 - 2013.
Chris@16 3 * Distributed under the Boost Software License, Version 1.0.
Chris@16 4 * (See accompanying file LICENSE_1_0.txt or copy at
Chris@16 5 * http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6 */
Chris@16 7 /*!
Chris@16 8 * \file async_frontend.hpp
Chris@16 9 * \author Andrey Semashev
Chris@16 10 * \date 14.07.2009
Chris@16 11 *
Chris@16 12 * The header contains implementation of asynchronous sink frontend.
Chris@16 13 */
Chris@16 14
Chris@16 15 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
Chris@16 16 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
Chris@16 17
Chris@16 18 #include <boost/log/detail/config.hpp>
Chris@16 19
Chris@16 20 #ifdef BOOST_HAS_PRAGMA_ONCE
Chris@16 21 #pragma once
Chris@16 22 #endif
Chris@16 23
Chris@16 24 #if defined(BOOST_LOG_NO_THREADS)
Chris@16 25 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
Chris@16 26 #endif
Chris@16 27
Chris@16 28 #include <boost/bind.hpp>
Chris@16 29 #include <boost/static_assert.hpp>
Chris@16 30 #include <boost/smart_ptr/shared_ptr.hpp>
Chris@16 31 #include <boost/smart_ptr/make_shared_object.hpp>
Chris@16 32 #include <boost/thread/locks.hpp>
Chris@16 33 #include <boost/thread/mutex.hpp>
Chris@16 34 #include <boost/thread/thread.hpp>
Chris@16 35 #include <boost/thread/condition_variable.hpp>
Chris@16 36 #include <boost/log/exceptions.hpp>
Chris@16 37 #include <boost/log/detail/locking_ptr.hpp>
Chris@16 38 #include <boost/log/detail/parameter_tools.hpp>
Chris@16 39 #include <boost/log/core/record_view.hpp>
Chris@16 40 #include <boost/log/sinks/basic_sink_frontend.hpp>
Chris@16 41 #include <boost/log/sinks/frontend_requirements.hpp>
Chris@16 42 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
Chris@16 43 #include <boost/log/keywords/start_thread.hpp>
Chris@16 44 #include <boost/log/detail/header.hpp>
Chris@16 45
Chris@16 46 namespace boost {
Chris@16 47
Chris@16 48 BOOST_LOG_OPEN_NAMESPACE
Chris@16 49
Chris@16 50 namespace sinks {
Chris@16 51
Chris@16 52 #ifndef BOOST_LOG_DOXYGEN_PASS
Chris@16 53
Chris@16 54 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, types)\
Chris@16 55 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
Chris@16 56 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
Chris@16 57 base_type(true),\
Chris@16 58 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
Chris@16 59 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\
Chris@16 60 m_StopRequested(false),\
Chris@16 61 m_FlushRequested(false)\
Chris@16 62 {\
Chris@16 63 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
Chris@16 64 start_feeding_thread();\
Chris@16 65 }\
Chris@16 66 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
Chris@16 67 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
Chris@16 68 base_type(true),\
Chris@16 69 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
Chris@16 70 m_pBackend(backend),\
Chris@16 71 m_StopRequested(false),\
Chris@16 72 m_FlushRequested(false)\
Chris@16 73 {\
Chris@16 74 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
Chris@16 75 start_feeding_thread();\
Chris@16 76 }
Chris@16 77
Chris@16 78 #endif // BOOST_LOG_DOXYGEN_PASS
Chris@16 79
Chris@16 80 /*!
Chris@16 81 * \brief Asynchronous logging sink frontend
Chris@16 82 *
Chris@16 83 * The frontend starts a separate thread on construction. All logging records are passed
Chris@16 84 * to the backend in this dedicated thread only.
Chris@16 85 */
Chris@16 86 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
Chris@16 87 class asynchronous_sink :
Chris@16 88 public aux::make_sink_frontend_base< SinkBackendT >::type,
Chris@16 89 private boost::log::aux::locking_ptr_counter_base,
Chris@16 90 public QueueingStrategyT
Chris@16 91 {
Chris@16 92 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
Chris@16 93 typedef QueueingStrategyT queue_base_type;
Chris@16 94
Chris@16 95 private:
Chris@16 96 //! Backend synchronization mutex type
Chris@16 97 typedef boost::mutex backend_mutex_type;
Chris@16 98 //! Frontend synchronization mutex type
Chris@16 99 typedef typename base_type::mutex_type frontend_mutex_type;
Chris@16 100
Chris@16 101 //! A scope guard that implements thread ID management
Chris@16 102 class scoped_thread_id
Chris@16 103 {
Chris@16 104 private:
Chris@16 105 frontend_mutex_type& m_Mutex;
Chris@16 106 condition_variable_any& m_Cond;
Chris@16 107 thread::id& m_ThreadID;
Chris@16 108 bool volatile& m_StopRequested;
Chris@16 109
Chris@16 110 public:
Chris@16 111 //! Initializing constructor
Chris@16 112 scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, bool volatile& sr)
Chris@16 113 : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
Chris@16 114 {
Chris@16 115 lock_guard< frontend_mutex_type > lock(m_Mutex);
Chris@16 116 if (m_ThreadID != thread::id())
Chris@16 117 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
Chris@16 118 m_ThreadID = this_thread::get_id();
Chris@16 119 }
Chris@16 120 //! Initializing constructor
Chris@16 121 scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, bool volatile& sr)
Chris@16 122 : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
Chris@16 123 {
Chris@16 124 unique_lock< frontend_mutex_type > lock(move(l));
Chris@16 125 if (m_ThreadID != thread::id())
Chris@16 126 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
Chris@16 127 m_ThreadID = this_thread::get_id();
Chris@16 128 }
Chris@16 129 //! Destructor
Chris@16 130 ~scoped_thread_id()
Chris@16 131 {
Chris@16 132 try
Chris@16 133 {
Chris@16 134 lock_guard< frontend_mutex_type > lock(m_Mutex);
Chris@16 135 m_StopRequested = false;
Chris@16 136 m_ThreadID = thread::id();
Chris@16 137 m_Cond.notify_all();
Chris@16 138 }
Chris@16 139 catch (...)
Chris@16 140 {
Chris@16 141 }
Chris@16 142 }
Chris@16 143
Chris@16 144 private:
Chris@16 145 scoped_thread_id(scoped_thread_id const&);
Chris@16 146 scoped_thread_id& operator= (scoped_thread_id const&);
Chris@16 147 };
Chris@16 148
Chris@16 149 //! A scope guard that resets a flag on destructor
Chris@16 150 class scoped_flag
Chris@16 151 {
Chris@16 152 private:
Chris@16 153 frontend_mutex_type& m_Mutex;
Chris@16 154 condition_variable_any& m_Cond;
Chris@16 155 volatile bool& m_Flag;
Chris@16 156
Chris@16 157 public:
Chris@16 158 explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, volatile bool& f) :
Chris@16 159 m_Mutex(mut), m_Cond(cond), m_Flag(f)
Chris@16 160 {
Chris@16 161 }
Chris@16 162 ~scoped_flag()
Chris@16 163 {
Chris@16 164 try
Chris@16 165 {
Chris@16 166 lock_guard< frontend_mutex_type > lock(m_Mutex);
Chris@16 167 m_Flag = false;
Chris@16 168 m_Cond.notify_all();
Chris@16 169 }
Chris@16 170 catch (...)
Chris@16 171 {
Chris@16 172 }
Chris@16 173 }
Chris@16 174
Chris@16 175 private:
Chris@16 176 scoped_flag(scoped_flag const&);
Chris@16 177 scoped_flag& operator= (scoped_flag const&);
Chris@16 178 };
Chris@16 179
Chris@16 180 public:
Chris@16 181 //! Sink implementation type
Chris@16 182 typedef SinkBackendT sink_backend_type;
Chris@16 183 //! \cond
Chris@16 184 BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
Chris@16 185 //! \endcond
Chris@16 186
Chris@16 187 #ifndef BOOST_LOG_DOXYGEN_PASS
Chris@16 188
Chris@16 189 //! A pointer type that locks the backend until it's destroyed
Chris@16 190 typedef boost::log::aux::locking_ptr< sink_backend_type > locked_backend_ptr;
Chris@16 191
Chris@16 192 #else // BOOST_LOG_DOXYGEN_PASS
Chris@16 193
Chris@16 194 //! A pointer type that locks the backend until it's destroyed
Chris@16 195 typedef implementation_defined locked_backend_ptr;
Chris@16 196
Chris@16 197 #endif // BOOST_LOG_DOXYGEN_PASS
Chris@16 198
Chris@16 199 private:
Chris@16 200 //! Synchronization mutex
Chris@16 201 backend_mutex_type m_BackendMutex;
Chris@16 202 //! Pointer to the backend
Chris@16 203 const shared_ptr< sink_backend_type > m_pBackend;
Chris@16 204
Chris@16 205 //! Dedicated record feeding thread
Chris@16 206 thread m_DedicatedFeedingThread;
Chris@16 207 //! Feeding thread ID
Chris@16 208 thread::id m_FeedingThreadID;
Chris@16 209 //! Condition variable to implement blocking operations
Chris@16 210 condition_variable_any m_BlockCond;
Chris@16 211
Chris@16 212 //! The flag indicates that the feeding loop has to be stopped
Chris@16 213 volatile bool m_StopRequested; // TODO: make it a real atomic
Chris@16 214 //! The flag indicates that queue flush has been requested
Chris@16 215 volatile bool m_FlushRequested; // TODO: make it a real atomic
Chris@16 216
Chris@16 217 public:
Chris@16 218 /*!
Chris@16 219 * Default constructor. Constructs the sink backend instance.
Chris@16 220 * Requires the backend to be default-constructible.
Chris@16 221 *
Chris@16 222 * \param start_thread If \c true, the frontend creates a thread to feed
Chris@16 223 * log records to the backend. Otherwise no thread is
Chris@16 224 * started and it is assumed that the user will call
Chris@16 225 * either \c run or \c feed_records himself.
Chris@16 226 */
Chris@16 227 asynchronous_sink(bool start_thread = true) :
Chris@16 228 base_type(true),
Chris@16 229 m_pBackend(boost::make_shared< sink_backend_type >()),
Chris@16 230 m_StopRequested(false),
Chris@16 231 m_FlushRequested(false)
Chris@16 232 {
Chris@16 233 if (start_thread)
Chris@16 234 start_feeding_thread();
Chris@16 235 }
Chris@16 236 /*!
Chris@16 237 * Constructor attaches user-constructed backend instance
Chris@16 238 *
Chris@16 239 * \param backend Pointer to the backend instance.
Chris@16 240 * \param start_thread If \c true, the frontend creates a thread to feed
Chris@16 241 * log records to the backend. Otherwise no thread is
Chris@16 242 * started and it is assumed that the user will call
Chris@16 243 * either \c run or \c feed_records himself.
Chris@16 244 *
Chris@16 245 * \pre \a backend is not \c NULL.
Chris@16 246 */
Chris@16 247 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
Chris@16 248 base_type(true),
Chris@16 249 m_pBackend(backend),
Chris@16 250 m_StopRequested(false),
Chris@16 251 m_FlushRequested(false)
Chris@16 252 {
Chris@16 253 if (start_thread)
Chris@16 254 start_feeding_thread();
Chris@16 255 }
Chris@16 256
Chris@16 257 // Constructors that pass arbitrary parameters to the backend constructor
Chris@16 258 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
Chris@16 259
Chris@16 260 /*!
Chris@16 261 * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
Chris@16 262 */
Chris@16 263 ~asynchronous_sink()
Chris@16 264 {
Chris@16 265 boost::this_thread::disable_interruption no_interrupts;
Chris@16 266 stop();
Chris@16 267 }
Chris@16 268
Chris@16 269 /*!
Chris@16 270 * Locking accessor to the attached backend
Chris@16 271 */
Chris@16 272 locked_backend_ptr locked_backend()
Chris@16 273 {
Chris@16 274 return locked_backend_ptr(
Chris@16 275 m_pBackend,
Chris@16 276 static_cast< boost::log::aux::locking_ptr_counter_base& >(*this));
Chris@16 277 }
Chris@16 278
Chris@16 279 /*!
Chris@16 280 * Enqueues the log record to the backend
Chris@16 281 */
Chris@16 282 void consume(record_view const& rec)
Chris@16 283 {
Chris@16 284 if (m_FlushRequested)
Chris@16 285 {
Chris@16 286 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
Chris@16 287 // Wait until flush is done
Chris@16 288 while (m_FlushRequested)
Chris@16 289 m_BlockCond.wait(lock);
Chris@16 290 }
Chris@16 291 queue_base_type::enqueue(rec);
Chris@16 292 }
Chris@16 293
Chris@16 294 /*!
Chris@16 295 * The method attempts to pass logging record to the backend
Chris@16 296 */
Chris@16 297 bool try_consume(record_view const& rec)
Chris@16 298 {
Chris@16 299 if (!m_FlushRequested)
Chris@16 300 {
Chris@16 301 return queue_base_type::try_enqueue(rec);
Chris@16 302 }
Chris@16 303 else
Chris@16 304 return false;
Chris@16 305 }
Chris@16 306
Chris@16 307 /*!
Chris@16 308 * The method starts record feeding loop and effectively blocks until either of this happens:
Chris@16 309 *
Chris@16 310 * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
Chris@16 311 * \li an exception is thrown while processing a log record in the backend, and the exception is
Chris@16 312 * not terminated by the exception handler, if one is installed
Chris@16 313 *
Chris@16 314 * \pre The sink frontend must be constructed without spawning a dedicated thread
Chris@16 315 */
Chris@16 316 void run()
Chris@16 317 {
Chris@16 318 // First check that no other thread is running
Chris@16 319 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
Chris@16 320
Chris@16 321 // Now start the feeding loop
Chris@16 322 while (true)
Chris@16 323 {
Chris@16 324 do_feed_records();
Chris@16 325 if (!m_StopRequested)
Chris@16 326 {
Chris@16 327 // Block until new record is available
Chris@16 328 record_view rec;
Chris@16 329 if (queue_base_type::dequeue_ready(rec))
Chris@16 330 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
Chris@16 331 }
Chris@16 332 else
Chris@16 333 break;
Chris@16 334 }
Chris@16 335 }
Chris@16 336
Chris@16 337 /*!
Chris@16 338 * The method softly interrupts record feeding loop. This method must be called when the \c run
Chris@16 339 * method execution has to be interrupted. Unlike regular thread interruption, calling
Chris@16 340 * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend
Chris@16 341 * will attempt to finish its business with the record in progress and return afterwards.
Chris@16 342 * This method can be called either if the sink was created with a dedicated thread,
Chris@16 343 * or if the feeding loop was initiated by user.
Chris@16 344 *
Chris@16 345 * \note Returning from this method does not guarantee that there are no records left buffered
Chris@16 346 * in the sink frontend. It is possible that log records keep coming during and after this
Chris@16 347 * method is called. At some point of execution of this method log records stop being processed,
Chris@16 348 * and all records that come after this point are put into the queue. These records will be
Chris@16 349 * processed upon further calls to \c run or \c feed_records.
Chris@16 350 */
Chris@16 351 void stop()
Chris@16 352 {
Chris@16 353 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
Chris@16 354 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
Chris@16 355 {
Chris@16 356 try
Chris@16 357 {
Chris@16 358 m_StopRequested = true;
Chris@16 359 queue_base_type::interrupt_dequeue();
Chris@16 360 while (m_StopRequested)
Chris@16 361 m_BlockCond.wait(lock);
Chris@16 362 }
Chris@16 363 catch (...)
Chris@16 364 {
Chris@16 365 m_StopRequested = false;
Chris@16 366 throw;
Chris@16 367 }
Chris@16 368
Chris@16 369 lock.unlock();
Chris@16 370 m_DedicatedFeedingThread.join();
Chris@16 371 }
Chris@16 372 }
Chris@16 373
Chris@16 374 /*!
Chris@16 375 * The method feeds log records that may have been buffered to the backend and returns
Chris@16 376 *
Chris@16 377 * \pre The sink frontend must be constructed without spawning a dedicated thread
Chris@16 378 */
Chris@16 379 void feed_records()
Chris@16 380 {
Chris@16 381 // First check that no other thread is running
Chris@16 382 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
Chris@16 383
Chris@16 384 // Now start the feeding loop
Chris@16 385 do_feed_records();
Chris@16 386 }
Chris@16 387
Chris@16 388 /*!
Chris@16 389 * The method feeds all log records that may have been buffered to the backend and returns.
Chris@16 390 * Unlike \c feed_records, in case of ordering queueing the method also feeds records
Chris@16 391 * that were enqueued during the ordering window, attempting to empty the queue completely.
Chris@16 392 *
Chris@16 393 * \pre The sink frontend must be constructed without spawning a dedicated thread
Chris@16 394 */
Chris@16 395 void flush()
Chris@16 396 {
Chris@16 397 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
Chris@16 398 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
Chris@16 399 {
Chris@16 400 // There is already a thread feeding records, let it do the job
Chris@16 401 m_FlushRequested = true;
Chris@16 402 queue_base_type::interrupt_dequeue();
Chris@16 403 while (!m_StopRequested && m_FlushRequested)
Chris@16 404 m_BlockCond.wait(lock);
Chris@16 405
Chris@16 406 // The condition may have been signalled when the feeding thread was finishing.
Chris@16 407 // In that case records may not have been flushed, and we do the flush ourselves.
Chris@16 408 if (m_FeedingThreadID != thread::id())
Chris@16 409 return;
Chris@16 410 }
Chris@16 411
Chris@16 412 m_FlushRequested = true;
Chris@16 413
Chris@16 414 // Flush records ourselves. The guard releases the lock.
Chris@16 415 scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested);
Chris@16 416
Chris@16 417 do_feed_records();
Chris@16 418 }
Chris@16 419
Chris@16 420 private:
Chris@16 421 #ifndef BOOST_LOG_DOXYGEN_PASS
Chris@16 422 //! The method spawns record feeding thread
Chris@16 423 void start_feeding_thread()
Chris@16 424 {
Chris@16 425 boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread);
Chris@16 426 }
Chris@16 427
Chris@16 428 // locking_ptr_counter_base methods
Chris@16 429 void lock() { m_BackendMutex.lock(); }
Chris@16 430 bool try_lock() { return m_BackendMutex.try_lock(); }
Chris@16 431 void unlock() { m_BackendMutex.unlock(); }
Chris@16 432
Chris@16 433 //! The record feeding loop
Chris@16 434 void do_feed_records()
Chris@16 435 {
Chris@16 436 while (!m_StopRequested)
Chris@16 437 {
Chris@16 438 record_view rec;
Chris@16 439 register bool dequeued = false;
Chris@16 440 if (!m_FlushRequested)
Chris@16 441 dequeued = queue_base_type::try_dequeue_ready(rec);
Chris@16 442 else
Chris@16 443 dequeued = queue_base_type::try_dequeue(rec);
Chris@16 444
Chris@16 445 if (dequeued)
Chris@16 446 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
Chris@16 447 else
Chris@16 448 break;
Chris@16 449 }
Chris@16 450
Chris@16 451 if (m_FlushRequested)
Chris@16 452 {
Chris@16 453 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
Chris@16 454 base_type::flush_backend(m_BackendMutex, *m_pBackend);
Chris@16 455 }
Chris@16 456 }
Chris@16 457 #endif // BOOST_LOG_DOXYGEN_PASS
Chris@16 458 };
Chris@16 459
Chris@16 460 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
Chris@16 461
Chris@16 462 } // namespace sinks
Chris@16 463
Chris@16 464 BOOST_LOG_CLOSE_NAMESPACE // namespace log
Chris@16 465
Chris@16 466 } // namespace boost
Chris@16 467
Chris@16 468 #include <boost/log/detail/footer.hpp>
Chris@16 469
Chris@16 470 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_