Chris@16: // Chris@16: // detail/impl/strand_service.ipp Chris@16: // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Chris@16: // Chris@101: // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com) Chris@16: // Chris@16: // Distributed under the Boost Software License, Version 1.0. (See accompanying Chris@16: // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@16: // Chris@16: Chris@16: #ifndef BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP Chris@16: #define BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP Chris@16: Chris@16: #if defined(_MSC_VER) && (_MSC_VER >= 1200) Chris@16: # pragma once Chris@16: #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: #include Chris@16: Chris@16: namespace boost { Chris@16: namespace asio { Chris@16: namespace detail { Chris@16: Chris@16: struct strand_service::on_do_complete_exit Chris@16: { Chris@16: io_service_impl* owner_; Chris@16: strand_impl* impl_; Chris@16: Chris@16: ~on_do_complete_exit() Chris@16: { Chris@16: impl_->mutex_.lock(); Chris@16: impl_->ready_queue_.push(impl_->waiting_queue_); Chris@16: bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty(); Chris@16: impl_->mutex_.unlock(); Chris@16: Chris@16: if (more_handlers) Chris@16: owner_->post_immediate_completion(impl_, true); Chris@16: } Chris@16: }; Chris@16: Chris@16: strand_service::strand_service(boost::asio::io_service& io_service) Chris@16: : boost::asio::detail::service_base(io_service), Chris@16: io_service_(boost::asio::use_service(io_service)), Chris@16: mutex_(), Chris@16: salt_(0) Chris@16: { Chris@16: } Chris@16: Chris@16: void strand_service::shutdown_service() Chris@16: { Chris@16: op_queue ops; Chris@16: Chris@16: boost::asio::detail::mutex::scoped_lock lock(mutex_); Chris@16: Chris@16: for (std::size_t i = 0; i < num_implementations; ++i) Chris@16: { Chris@16: if (strand_impl* impl = implementations_[i].get()) Chris@16: { Chris@16: ops.push(impl->waiting_queue_); Chris@16: ops.push(impl->ready_queue_); Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: void strand_service::construct(strand_service::implementation_type& impl) Chris@16: { Chris@16: boost::asio::detail::mutex::scoped_lock lock(mutex_); Chris@16: Chris@16: std::size_t salt = salt_++; Chris@16: #if defined(BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) Chris@16: std::size_t index = salt; Chris@16: #else // defined(BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) Chris@16: std::size_t index = reinterpret_cast(&impl); Chris@16: index += (reinterpret_cast(&impl) >> 3); Chris@16: index ^= salt + 0x9e3779b9 + (index << 6) + (index >> 2); Chris@16: #endif // defined(BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) Chris@16: index = index % num_implementations; Chris@16: Chris@16: if (!implementations_[index].get()) Chris@16: implementations_[index].reset(new strand_impl); Chris@16: impl = implementations_[index].get(); Chris@16: } Chris@16: Chris@16: bool strand_service::running_in_this_thread( Chris@16: const implementation_type& impl) const Chris@16: { Chris@16: return call_stack::contains(impl) != 0; Chris@16: } Chris@16: Chris@16: bool strand_service::do_dispatch(implementation_type& impl, operation* op) Chris@16: { Chris@16: // If we are running inside the io_service, and no other handler already Chris@16: // holds the strand lock, then the handler can run immediately. Chris@16: bool can_dispatch = io_service_.can_dispatch(); Chris@16: impl->mutex_.lock(); Chris@16: if (can_dispatch && !impl->locked_) Chris@16: { Chris@16: // Immediate invocation is allowed. Chris@16: impl->locked_ = true; Chris@16: impl->mutex_.unlock(); Chris@16: return true; Chris@16: } Chris@16: Chris@16: if (impl->locked_) Chris@16: { Chris@16: // Some other handler already holds the strand lock. Enqueue for later. Chris@16: impl->waiting_queue_.push(op); Chris@16: impl->mutex_.unlock(); Chris@16: } Chris@16: else Chris@16: { Chris@16: // The handler is acquiring the strand lock and so is responsible for Chris@16: // scheduling the strand. Chris@16: impl->locked_ = true; Chris@16: impl->mutex_.unlock(); Chris@16: impl->ready_queue_.push(op); Chris@16: io_service_.post_immediate_completion(impl, false); Chris@16: } Chris@16: Chris@16: return false; Chris@16: } Chris@16: Chris@16: void strand_service::do_post(implementation_type& impl, Chris@16: operation* op, bool is_continuation) Chris@16: { Chris@16: impl->mutex_.lock(); Chris@16: if (impl->locked_) Chris@16: { Chris@16: // Some other handler already holds the strand lock. Enqueue for later. Chris@16: impl->waiting_queue_.push(op); Chris@16: impl->mutex_.unlock(); Chris@16: } Chris@16: else Chris@16: { Chris@16: // The handler is acquiring the strand lock and so is responsible for Chris@16: // scheduling the strand. Chris@16: impl->locked_ = true; Chris@16: impl->mutex_.unlock(); Chris@16: impl->ready_queue_.push(op); Chris@16: io_service_.post_immediate_completion(impl, is_continuation); Chris@16: } Chris@16: } Chris@16: Chris@16: void strand_service::do_complete(io_service_impl* owner, operation* base, Chris@16: const boost::system::error_code& ec, std::size_t /*bytes_transferred*/) Chris@16: { Chris@16: if (owner) Chris@16: { Chris@16: strand_impl* impl = static_cast(base); Chris@16: Chris@16: // Indicate that this strand is executing on the current thread. Chris@16: call_stack::context ctx(impl); Chris@16: Chris@16: // Ensure the next handler, if any, is scheduled on block exit. Chris@16: on_do_complete_exit on_exit = { owner, impl }; Chris@16: (void)on_exit; Chris@16: Chris@16: // Run all ready handlers. No lock is required since the ready queue is Chris@16: // accessed only within the strand. Chris@16: while (operation* o = impl->ready_queue_.front()) Chris@16: { Chris@16: impl->ready_queue_.pop(); Chris@16: o->complete(*owner, ec, 0); Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: } // namespace detail Chris@16: } // namespace asio Chris@16: } // namespace boost Chris@16: Chris@16: #include Chris@16: Chris@16: #endif // BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP