Chris@16: /* Chris@101: * Copyright Andrey Semashev 2007 - 2015. Chris@16: * Distributed under the Boost Software License, Version 1.0. Chris@16: * (See accompanying file LICENSE_1_0.txt or copy at Chris@16: * http://www.boost.org/LICENSE_1_0.txt) Chris@16: */ Chris@16: /*! Chris@16: * \file async_frontend.hpp Chris@16: * \author Andrey Semashev Chris@16: * \date 14.07.2009 Chris@16: * Chris@16: * The header contains implementation of asynchronous sink frontend. Chris@16: */ Chris@16: Chris@16: #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ Chris@16: #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ Chris@16: Chris@16: #include Chris@16: Chris@16: #ifdef BOOST_HAS_PRAGMA_ONCE Chris@16: #pragma once Chris@16: #endif Chris@16: Chris@16: #if defined(BOOST_LOG_NO_THREADS) Chris@16: #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment Chris@16: #endif Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@101: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: namespace boost { Chris@16: Chris@16: BOOST_LOG_OPEN_NAMESPACE Chris@16: Chris@16: namespace sinks { Chris@16: Chris@16: #ifndef BOOST_LOG_DOXYGEN_PASS Chris@16: Chris@16: #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, types)\ Chris@16: template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ Chris@16: explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ Chris@16: base_type(true),\ Chris@16: queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ Chris@16: m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\ Chris@16: m_StopRequested(false),\ Chris@16: m_FlushRequested(false)\ Chris@16: {\ Chris@16: if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ Chris@16: start_feeding_thread();\ Chris@16: }\ Chris@16: template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ Chris@16: explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ Chris@16: base_type(true),\ Chris@16: queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ Chris@16: m_pBackend(backend),\ Chris@16: m_StopRequested(false),\ Chris@16: m_FlushRequested(false)\ Chris@16: {\ Chris@16: if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ Chris@16: start_feeding_thread();\ Chris@16: } Chris@16: Chris@16: #endif // BOOST_LOG_DOXYGEN_PASS Chris@16: Chris@16: /*! Chris@16: * \brief Asynchronous logging sink frontend Chris@16: * Chris@16: * The frontend starts a separate thread on construction. All logging records are passed Chris@16: * to the backend in this dedicated thread only. Chris@16: */ Chris@16: template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue > Chris@16: class asynchronous_sink : Chris@16: public aux::make_sink_frontend_base< SinkBackendT >::type, Chris@16: public QueueingStrategyT Chris@16: { Chris@16: typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type; Chris@16: typedef QueueingStrategyT queue_base_type; Chris@16: Chris@16: private: Chris@16: //! Backend synchronization mutex type Chris@101: typedef boost::recursive_mutex backend_mutex_type; Chris@16: //! Frontend synchronization mutex type Chris@16: typedef typename base_type::mutex_type frontend_mutex_type; Chris@16: Chris@16: //! A scope guard that implements thread ID management Chris@16: class scoped_thread_id Chris@16: { Chris@16: private: Chris@16: frontend_mutex_type& m_Mutex; Chris@16: condition_variable_any& m_Cond; Chris@16: thread::id& m_ThreadID; Chris@16: bool volatile& m_StopRequested; Chris@16: Chris@16: public: Chris@16: //! Initializing constructor Chris@16: scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, bool volatile& sr) Chris@16: : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) Chris@16: { Chris@16: lock_guard< frontend_mutex_type > lock(m_Mutex); Chris@16: if (m_ThreadID != thread::id()) Chris@16: BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); Chris@16: m_ThreadID = this_thread::get_id(); Chris@16: } Chris@16: //! Initializing constructor Chris@16: scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, bool volatile& sr) Chris@16: : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) Chris@16: { Chris@16: unique_lock< frontend_mutex_type > lock(move(l)); Chris@16: if (m_ThreadID != thread::id()) Chris@16: BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); Chris@16: m_ThreadID = this_thread::get_id(); Chris@16: } Chris@16: //! Destructor Chris@16: ~scoped_thread_id() Chris@16: { Chris@16: try Chris@16: { Chris@16: lock_guard< frontend_mutex_type > lock(m_Mutex); Chris@16: m_StopRequested = false; Chris@16: m_ThreadID = thread::id(); Chris@16: m_Cond.notify_all(); Chris@16: } Chris@16: catch (...) Chris@16: { Chris@16: } Chris@16: } Chris@16: Chris@16: private: Chris@16: scoped_thread_id(scoped_thread_id const&); Chris@16: scoped_thread_id& operator= (scoped_thread_id const&); Chris@16: }; Chris@16: Chris@16: //! A scope guard that resets a flag on destructor Chris@16: class scoped_flag Chris@16: { Chris@16: private: Chris@16: frontend_mutex_type& m_Mutex; Chris@16: condition_variable_any& m_Cond; Chris@16: volatile bool& m_Flag; Chris@16: Chris@16: public: Chris@16: explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, volatile bool& f) : Chris@16: m_Mutex(mut), m_Cond(cond), m_Flag(f) Chris@16: { Chris@16: } Chris@16: ~scoped_flag() Chris@16: { Chris@16: try Chris@16: { Chris@16: lock_guard< frontend_mutex_type > lock(m_Mutex); Chris@16: m_Flag = false; Chris@16: m_Cond.notify_all(); Chris@16: } Chris@16: catch (...) Chris@16: { Chris@16: } Chris@16: } Chris@16: Chris@16: private: Chris@16: scoped_flag(scoped_flag const&); Chris@16: scoped_flag& operator= (scoped_flag const&); Chris@16: }; Chris@16: Chris@16: public: Chris@16: //! Sink implementation type Chris@16: typedef SinkBackendT sink_backend_type; Chris@16: //! \cond Chris@16: BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met"); Chris@16: //! \endcond Chris@16: Chris@16: #ifndef BOOST_LOG_DOXYGEN_PASS Chris@16: Chris@16: //! A pointer type that locks the backend until it's destroyed Chris@101: typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr; Chris@16: Chris@16: #else // BOOST_LOG_DOXYGEN_PASS Chris@16: Chris@16: //! A pointer type that locks the backend until it's destroyed Chris@16: typedef implementation_defined locked_backend_ptr; Chris@16: Chris@16: #endif // BOOST_LOG_DOXYGEN_PASS Chris@16: Chris@16: private: Chris@16: //! Synchronization mutex Chris@16: backend_mutex_type m_BackendMutex; Chris@16: //! Pointer to the backend Chris@16: const shared_ptr< sink_backend_type > m_pBackend; Chris@16: Chris@16: //! Dedicated record feeding thread Chris@16: thread m_DedicatedFeedingThread; Chris@16: //! Feeding thread ID Chris@16: thread::id m_FeedingThreadID; Chris@16: //! Condition variable to implement blocking operations Chris@16: condition_variable_any m_BlockCond; Chris@16: Chris@16: //! The flag indicates that the feeding loop has to be stopped Chris@16: volatile bool m_StopRequested; // TODO: make it a real atomic Chris@16: //! The flag indicates that queue flush has been requested Chris@16: volatile bool m_FlushRequested; // TODO: make it a real atomic Chris@16: Chris@16: public: Chris@16: /*! Chris@16: * Default constructor. Constructs the sink backend instance. Chris@16: * Requires the backend to be default-constructible. Chris@16: * Chris@16: * \param start_thread If \c true, the frontend creates a thread to feed Chris@16: * log records to the backend. Otherwise no thread is Chris@16: * started and it is assumed that the user will call Chris@16: * either \c run or \c feed_records himself. Chris@16: */ Chris@16: asynchronous_sink(bool start_thread = true) : Chris@16: base_type(true), Chris@16: m_pBackend(boost::make_shared< sink_backend_type >()), Chris@16: m_StopRequested(false), Chris@16: m_FlushRequested(false) Chris@16: { Chris@16: if (start_thread) Chris@16: start_feeding_thread(); Chris@16: } Chris@16: /*! Chris@16: * Constructor attaches user-constructed backend instance Chris@16: * Chris@16: * \param backend Pointer to the backend instance. Chris@16: * \param start_thread If \c true, the frontend creates a thread to feed Chris@16: * log records to the backend. Otherwise no thread is Chris@16: * started and it is assumed that the user will call Chris@16: * either \c run or \c feed_records himself. Chris@16: * Chris@16: * \pre \a backend is not \c NULL. Chris@16: */ Chris@16: explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) : Chris@16: base_type(true), Chris@16: m_pBackend(backend), Chris@16: m_StopRequested(false), Chris@16: m_FlushRequested(false) Chris@16: { Chris@16: if (start_thread) Chris@16: start_feeding_thread(); Chris@16: } Chris@16: Chris@16: // Constructors that pass arbitrary parameters to the backend constructor Chris@16: BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~) Chris@16: Chris@16: /*! Chris@16: * Destructor. Implicitly stops the dedicated feeding thread, if one is running. Chris@16: */ Chris@16: ~asynchronous_sink() Chris@16: { Chris@16: boost::this_thread::disable_interruption no_interrupts; Chris@16: stop(); Chris@16: } Chris@16: Chris@16: /*! Chris@16: * Locking accessor to the attached backend Chris@16: */ Chris@16: locked_backend_ptr locked_backend() Chris@16: { Chris@101: return locked_backend_ptr(m_pBackend, m_BackendMutex); Chris@16: } Chris@16: Chris@16: /*! Chris@16: * Enqueues the log record to the backend Chris@16: */ Chris@16: void consume(record_view const& rec) Chris@16: { Chris@16: if (m_FlushRequested) Chris@16: { Chris@16: unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); Chris@16: // Wait until flush is done Chris@16: while (m_FlushRequested) Chris@16: m_BlockCond.wait(lock); Chris@16: } Chris@16: queue_base_type::enqueue(rec); Chris@16: } Chris@16: Chris@16: /*! Chris@16: * The method attempts to pass logging record to the backend Chris@16: */ Chris@16: bool try_consume(record_view const& rec) Chris@16: { Chris@16: if (!m_FlushRequested) Chris@16: { Chris@16: return queue_base_type::try_enqueue(rec); Chris@16: } Chris@16: else Chris@16: return false; Chris@16: } Chris@16: Chris@16: /*! Chris@16: * The method starts record feeding loop and effectively blocks until either of this happens: Chris@16: * Chris@16: * \li the thread is interrupted due to either standard thread interruption or a call to \c stop Chris@16: * \li an exception is thrown while processing a log record in the backend, and the exception is Chris@16: * not terminated by the exception handler, if one is installed Chris@16: * Chris@16: * \pre The sink frontend must be constructed without spawning a dedicated thread Chris@16: */ Chris@16: void run() Chris@16: { Chris@16: // First check that no other thread is running Chris@16: scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); Chris@16: Chris@16: // Now start the feeding loop Chris@16: while (true) Chris@16: { Chris@16: do_feed_records(); Chris@16: if (!m_StopRequested) Chris@16: { Chris@16: // Block until new record is available Chris@16: record_view rec; Chris@16: if (queue_base_type::dequeue_ready(rec)) Chris@16: base_type::feed_record(rec, m_BackendMutex, *m_pBackend); Chris@16: } Chris@16: else Chris@16: break; Chris@16: } Chris@16: } Chris@16: Chris@16: /*! Chris@16: * The method softly interrupts record feeding loop. This method must be called when the \c run Chris@16: * method execution has to be interrupted. Unlike regular thread interruption, calling Chris@16: * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend Chris@16: * will attempt to finish its business with the record in progress and return afterwards. Chris@16: * This method can be called either if the sink was created with a dedicated thread, Chris@16: * or if the feeding loop was initiated by user. Chris@16: * Chris@16: * \note Returning from this method does not guarantee that there are no records left buffered Chris@16: * in the sink frontend. It is possible that log records keep coming during and after this Chris@16: * method is called. At some point of execution of this method log records stop being processed, Chris@16: * and all records that come after this point are put into the queue. These records will be Chris@16: * processed upon further calls to \c run or \c feed_records. Chris@16: */ Chris@16: void stop() Chris@16: { Chris@16: unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); Chris@16: if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) Chris@16: { Chris@16: try Chris@16: { Chris@16: m_StopRequested = true; Chris@16: queue_base_type::interrupt_dequeue(); Chris@16: while (m_StopRequested) Chris@16: m_BlockCond.wait(lock); Chris@16: } Chris@16: catch (...) Chris@16: { Chris@16: m_StopRequested = false; Chris@16: throw; Chris@16: } Chris@16: Chris@16: lock.unlock(); Chris@16: m_DedicatedFeedingThread.join(); Chris@16: } Chris@16: } Chris@16: Chris@16: /*! Chris@16: * The method feeds log records that may have been buffered to the backend and returns Chris@16: * Chris@16: * \pre The sink frontend must be constructed without spawning a dedicated thread Chris@16: */ Chris@16: void feed_records() Chris@16: { Chris@16: // First check that no other thread is running Chris@16: scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); Chris@16: Chris@16: // Now start the feeding loop Chris@16: do_feed_records(); Chris@16: } Chris@16: Chris@16: /*! Chris@16: * The method feeds all log records that may have been buffered to the backend and returns. Chris@16: * Unlike \c feed_records, in case of ordering queueing the method also feeds records Chris@16: * that were enqueued during the ordering window, attempting to empty the queue completely. Chris@16: * Chris@16: * \pre The sink frontend must be constructed without spawning a dedicated thread Chris@16: */ Chris@16: void flush() Chris@16: { Chris@16: unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); Chris@16: if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) Chris@16: { Chris@16: // There is already a thread feeding records, let it do the job Chris@16: m_FlushRequested = true; Chris@16: queue_base_type::interrupt_dequeue(); Chris@16: while (!m_StopRequested && m_FlushRequested) Chris@16: m_BlockCond.wait(lock); Chris@16: Chris@16: // The condition may have been signalled when the feeding thread was finishing. Chris@16: // In that case records may not have been flushed, and we do the flush ourselves. Chris@16: if (m_FeedingThreadID != thread::id()) Chris@16: return; Chris@16: } Chris@16: Chris@16: m_FlushRequested = true; Chris@16: Chris@16: // Flush records ourselves. The guard releases the lock. Chris@16: scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested); Chris@16: Chris@16: do_feed_records(); Chris@16: } Chris@16: Chris@16: private: Chris@16: #ifndef BOOST_LOG_DOXYGEN_PASS Chris@16: //! The method spawns record feeding thread Chris@16: void start_feeding_thread() Chris@16: { Chris@16: boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread); Chris@16: } Chris@16: Chris@16: //! The record feeding loop Chris@16: void do_feed_records() Chris@16: { Chris@16: while (!m_StopRequested) Chris@16: { Chris@16: record_view rec; Chris@101: bool dequeued = false; Chris@16: if (!m_FlushRequested) Chris@16: dequeued = queue_base_type::try_dequeue_ready(rec); Chris@16: else Chris@16: dequeued = queue_base_type::try_dequeue(rec); Chris@16: Chris@16: if (dequeued) Chris@16: base_type::feed_record(rec, m_BackendMutex, *m_pBackend); Chris@16: else Chris@16: break; Chris@16: } Chris@16: Chris@16: if (m_FlushRequested) Chris@16: { Chris@16: scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested); Chris@16: base_type::flush_backend(m_BackendMutex, *m_pBackend); Chris@16: } Chris@16: } Chris@16: #endif // BOOST_LOG_DOXYGEN_PASS Chris@16: }; Chris@16: Chris@16: #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL Chris@16: Chris@16: } // namespace sinks Chris@16: Chris@16: BOOST_LOG_CLOSE_NAMESPACE // namespace log Chris@16: Chris@16: } // namespace boost Chris@16: Chris@16: #include Chris@16: Chris@16: #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_