annotate DEPENDENCIES/generic/include/boost/asio/detail/impl/select_reactor.ipp @ 125:34e428693f5d vext

Vext -> Repoint
author Chris Cannam
date Thu, 14 Jun 2018 11:15:39 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 //
Chris@16 2 // detail/impl/select_reactor.ipp
Chris@16 3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Chris@16 4 //
Chris@101 5 // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
Chris@16 6 //
Chris@16 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
Chris@16 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Chris@16 9 //
Chris@16 10
Chris@16 11 #ifndef BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP
Chris@16 12 #define BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP
Chris@16 13
Chris@16 14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
Chris@16 15 # pragma once
Chris@16 16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
Chris@16 17
Chris@16 18 #include <boost/asio/detail/config.hpp>
Chris@16 19
Chris@16 20 #if defined(BOOST_ASIO_HAS_IOCP) \
Chris@16 21 || (!defined(BOOST_ASIO_HAS_DEV_POLL) \
Chris@16 22 && !defined(BOOST_ASIO_HAS_EPOLL) \
Chris@16 23 && !defined(BOOST_ASIO_HAS_KQUEUE) \
Chris@16 24 && !defined(BOOST_ASIO_WINDOWS_RUNTIME))
Chris@16 25
Chris@16 26 #include <boost/asio/detail/bind_handler.hpp>
Chris@16 27 #include <boost/asio/detail/fd_set_adapter.hpp>
Chris@16 28 #include <boost/asio/detail/select_reactor.hpp>
Chris@16 29 #include <boost/asio/detail/signal_blocker.hpp>
Chris@16 30 #include <boost/asio/detail/socket_ops.hpp>
Chris@16 31
Chris@16 32 #include <boost/asio/detail/push_options.hpp>
Chris@16 33
Chris@16 34 namespace boost {
Chris@16 35 namespace asio {
Chris@16 36 namespace detail {
Chris@16 37
Chris@16 38 select_reactor::select_reactor(boost::asio::io_service& io_service)
Chris@16 39 : boost::asio::detail::service_base<select_reactor>(io_service),
Chris@16 40 io_service_(use_service<io_service_impl>(io_service)),
Chris@16 41 mutex_(),
Chris@16 42 interrupter_(),
Chris@16 43 #if defined(BOOST_ASIO_HAS_IOCP)
Chris@16 44 stop_thread_(false),
Chris@16 45 thread_(0),
Chris@16 46 #endif // defined(BOOST_ASIO_HAS_IOCP)
Chris@16 47 shutdown_(false)
Chris@16 48 {
Chris@16 49 #if defined(BOOST_ASIO_HAS_IOCP)
Chris@16 50 boost::asio::detail::signal_blocker sb;
Chris@16 51 thread_ = new boost::asio::detail::thread(
Chris@16 52 bind_handler(&select_reactor::call_run_thread, this));
Chris@16 53 #endif // defined(BOOST_ASIO_HAS_IOCP)
Chris@16 54 }
Chris@16 55
Chris@16 56 select_reactor::~select_reactor()
Chris@16 57 {
Chris@16 58 shutdown_service();
Chris@16 59 }
Chris@16 60
Chris@16 61 void select_reactor::shutdown_service()
Chris@16 62 {
Chris@16 63 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 64 shutdown_ = true;
Chris@16 65 #if defined(BOOST_ASIO_HAS_IOCP)
Chris@16 66 stop_thread_ = true;
Chris@16 67 #endif // defined(BOOST_ASIO_HAS_IOCP)
Chris@16 68 lock.unlock();
Chris@16 69
Chris@16 70 #if defined(BOOST_ASIO_HAS_IOCP)
Chris@16 71 if (thread_)
Chris@16 72 {
Chris@16 73 interrupter_.interrupt();
Chris@16 74 thread_->join();
Chris@16 75 delete thread_;
Chris@16 76 thread_ = 0;
Chris@16 77 }
Chris@16 78 #endif // defined(BOOST_ASIO_HAS_IOCP)
Chris@16 79
Chris@16 80 op_queue<operation> ops;
Chris@16 81
Chris@16 82 for (int i = 0; i < max_ops; ++i)
Chris@16 83 op_queue_[i].get_all_operations(ops);
Chris@16 84
Chris@16 85 timer_queues_.get_all_timers(ops);
Chris@16 86
Chris@16 87 io_service_.abandon_operations(ops);
Chris@16 88 }
Chris@16 89
Chris@16 90 void select_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
Chris@16 91 {
Chris@16 92 if (fork_ev == boost::asio::io_service::fork_child)
Chris@16 93 interrupter_.recreate();
Chris@16 94 }
Chris@16 95
Chris@16 96 void select_reactor::init_task()
Chris@16 97 {
Chris@16 98 io_service_.init_task();
Chris@16 99 }
Chris@16 100
Chris@16 101 int select_reactor::register_descriptor(socket_type,
Chris@16 102 select_reactor::per_descriptor_data&)
Chris@16 103 {
Chris@16 104 return 0;
Chris@16 105 }
Chris@16 106
Chris@16 107 int select_reactor::register_internal_descriptor(
Chris@16 108 int op_type, socket_type descriptor,
Chris@16 109 select_reactor::per_descriptor_data&, reactor_op* op)
Chris@16 110 {
Chris@16 111 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 112
Chris@16 113 op_queue_[op_type].enqueue_operation(descriptor, op);
Chris@16 114 interrupter_.interrupt();
Chris@16 115
Chris@16 116 return 0;
Chris@16 117 }
Chris@16 118
Chris@16 119 void select_reactor::move_descriptor(socket_type,
Chris@16 120 select_reactor::per_descriptor_data&,
Chris@16 121 select_reactor::per_descriptor_data&)
Chris@16 122 {
Chris@16 123 }
Chris@16 124
Chris@16 125 void select_reactor::start_op(int op_type, socket_type descriptor,
Chris@16 126 select_reactor::per_descriptor_data&, reactor_op* op,
Chris@16 127 bool is_continuation, bool)
Chris@16 128 {
Chris@16 129 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 130
Chris@16 131 if (shutdown_)
Chris@16 132 {
Chris@16 133 post_immediate_completion(op, is_continuation);
Chris@16 134 return;
Chris@16 135 }
Chris@16 136
Chris@16 137 bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
Chris@16 138 io_service_.work_started();
Chris@16 139 if (first)
Chris@16 140 interrupter_.interrupt();
Chris@16 141 }
Chris@16 142
Chris@16 143 void select_reactor::cancel_ops(socket_type descriptor,
Chris@16 144 select_reactor::per_descriptor_data&)
Chris@16 145 {
Chris@16 146 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 147 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
Chris@16 148 }
Chris@16 149
Chris@16 150 void select_reactor::deregister_descriptor(socket_type descriptor,
Chris@16 151 select_reactor::per_descriptor_data&, bool)
Chris@16 152 {
Chris@16 153 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 154 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
Chris@16 155 }
Chris@16 156
Chris@16 157 void select_reactor::deregister_internal_descriptor(
Chris@16 158 socket_type descriptor, select_reactor::per_descriptor_data&)
Chris@16 159 {
Chris@16 160 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 161 op_queue<operation> ops;
Chris@16 162 for (int i = 0; i < max_ops; ++i)
Chris@16 163 op_queue_[i].cancel_operations(descriptor, ops);
Chris@16 164 }
Chris@16 165
Chris@16 166 void select_reactor::run(bool block, op_queue<operation>& ops)
Chris@16 167 {
Chris@16 168 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 169
Chris@16 170 #if defined(BOOST_ASIO_HAS_IOCP)
Chris@16 171 // Check if the thread is supposed to stop.
Chris@16 172 if (stop_thread_)
Chris@16 173 return;
Chris@16 174 #endif // defined(BOOST_ASIO_HAS_IOCP)
Chris@16 175
Chris@16 176 // Set up the descriptor sets.
Chris@16 177 for (int i = 0; i < max_select_ops; ++i)
Chris@16 178 fd_sets_[i].reset();
Chris@16 179 fd_sets_[read_op].set(interrupter_.read_descriptor());
Chris@16 180 socket_type max_fd = 0;
Chris@16 181 bool have_work_to_do = !timer_queues_.all_empty();
Chris@16 182 for (int i = 0; i < max_select_ops; ++i)
Chris@16 183 {
Chris@16 184 have_work_to_do = have_work_to_do || !op_queue_[i].empty();
Chris@101 185 fd_sets_[i].set(op_queue_[i], ops);
Chris@16 186 if (fd_sets_[i].max_descriptor() > max_fd)
Chris@16 187 max_fd = fd_sets_[i].max_descriptor();
Chris@16 188 }
Chris@16 189
Chris@16 190 #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
Chris@16 191 // Connection operations on Windows use both except and write fd_sets.
Chris@16 192 have_work_to_do = have_work_to_do || !op_queue_[connect_op].empty();
Chris@101 193 fd_sets_[write_op].set(op_queue_[connect_op], ops);
Chris@16 194 if (fd_sets_[write_op].max_descriptor() > max_fd)
Chris@16 195 max_fd = fd_sets_[write_op].max_descriptor();
Chris@101 196 fd_sets_[except_op].set(op_queue_[connect_op], ops);
Chris@16 197 if (fd_sets_[except_op].max_descriptor() > max_fd)
Chris@16 198 max_fd = fd_sets_[except_op].max_descriptor();
Chris@16 199 #endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
Chris@16 200
Chris@16 201 // We can return immediately if there's no work to do and the reactor is
Chris@16 202 // not supposed to block.
Chris@16 203 if (!block && !have_work_to_do)
Chris@16 204 return;
Chris@16 205
Chris@16 206 // Determine how long to block while waiting for events.
Chris@16 207 timeval tv_buf = { 0, 0 };
Chris@16 208 timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
Chris@16 209
Chris@16 210 lock.unlock();
Chris@16 211
Chris@16 212 // Block on the select call until descriptors become ready.
Chris@16 213 boost::system::error_code ec;
Chris@16 214 int retval = socket_ops::select(static_cast<int>(max_fd + 1),
Chris@16 215 fd_sets_[read_op], fd_sets_[write_op], fd_sets_[except_op], tv, ec);
Chris@16 216
Chris@16 217 // Reset the interrupter.
Chris@16 218 if (retval > 0 && fd_sets_[read_op].is_set(interrupter_.read_descriptor()))
Chris@16 219 {
Chris@16 220 interrupter_.reset();
Chris@16 221 --retval;
Chris@16 222 }
Chris@16 223
Chris@16 224 lock.lock();
Chris@16 225
Chris@16 226 // Dispatch all ready operations.
Chris@16 227 if (retval > 0)
Chris@16 228 {
Chris@16 229 #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
Chris@16 230 // Connection operations on Windows use both except and write fd_sets.
Chris@101 231 fd_sets_[except_op].perform(op_queue_[connect_op], ops);
Chris@101 232 fd_sets_[write_op].perform(op_queue_[connect_op], ops);
Chris@16 233 #endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
Chris@16 234
Chris@16 235 // Exception operations must be processed first to ensure that any
Chris@16 236 // out-of-band data is read before normal data.
Chris@16 237 for (int i = max_select_ops - 1; i >= 0; --i)
Chris@101 238 fd_sets_[i].perform(op_queue_[i], ops);
Chris@16 239 }
Chris@16 240 timer_queues_.get_ready_timers(ops);
Chris@16 241 }
Chris@16 242
Chris@16 243 void select_reactor::interrupt()
Chris@16 244 {
Chris@16 245 interrupter_.interrupt();
Chris@16 246 }
Chris@16 247
Chris@16 248 #if defined(BOOST_ASIO_HAS_IOCP)
Chris@16 249 void select_reactor::run_thread()
Chris@16 250 {
Chris@16 251 boost::asio::detail::mutex::scoped_lock lock(mutex_);
Chris@16 252 while (!stop_thread_)
Chris@16 253 {
Chris@16 254 lock.unlock();
Chris@16 255 op_queue<operation> ops;
Chris@16 256 run(true, ops);
Chris@16 257 io_service_.post_deferred_completions(ops);
Chris@16 258 lock.lock();
Chris@16 259 }
Chris@16 260 }
Chris@16 261
Chris@16 262 void select_reactor::call_run_thread(select_reactor* reactor)
Chris@16 263 {
Chris@16 264 reactor->run_thread();
Chris@16 265 }
Chris@16 266 #endif // defined(BOOST_ASIO_HAS_IOCP)
Chris@16 267
Chris@16 268 void select_reactor::do_add_timer_queue(timer_queue_base& queue)
Chris@16 269 {
Chris@16 270 mutex::scoped_lock lock(mutex_);
Chris@16 271 timer_queues_.insert(&queue);
Chris@16 272 }
Chris@16 273
Chris@16 274 void select_reactor::do_remove_timer_queue(timer_queue_base& queue)
Chris@16 275 {
Chris@16 276 mutex::scoped_lock lock(mutex_);
Chris@16 277 timer_queues_.erase(&queue);
Chris@16 278 }
Chris@16 279
Chris@16 280 timeval* select_reactor::get_timeout(timeval& tv)
Chris@16 281 {
Chris@16 282 // By default we will wait no longer than 5 minutes. This will ensure that
Chris@16 283 // any changes to the system clock are detected after no longer than this.
Chris@16 284 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
Chris@16 285 tv.tv_sec = usec / 1000000;
Chris@16 286 tv.tv_usec = usec % 1000000;
Chris@16 287 return &tv;
Chris@16 288 }
Chris@16 289
Chris@16 290 void select_reactor::cancel_ops_unlocked(socket_type descriptor,
Chris@16 291 const boost::system::error_code& ec)
Chris@16 292 {
Chris@16 293 bool need_interrupt = false;
Chris@16 294 op_queue<operation> ops;
Chris@16 295 for (int i = 0; i < max_ops; ++i)
Chris@16 296 need_interrupt = op_queue_[i].cancel_operations(
Chris@16 297 descriptor, ops, ec) || need_interrupt;
Chris@16 298 io_service_.post_deferred_completions(ops);
Chris@16 299 if (need_interrupt)
Chris@16 300 interrupter_.interrupt();
Chris@16 301 }
Chris@16 302
Chris@16 303 } // namespace detail
Chris@16 304 } // namespace asio
Chris@16 305 } // namespace boost
Chris@16 306
Chris@16 307 #include <boost/asio/detail/pop_options.hpp>
Chris@16 308
Chris@16 309 #endif // defined(BOOST_ASIO_HAS_IOCP)
Chris@16 310 // || (!defined(BOOST_ASIO_HAS_DEV_POLL)
Chris@16 311 // && !defined(BOOST_ASIO_HAS_EPOLL)
Chris@16 312 // && !defined(BOOST_ASIO_HAS_KQUEUE))
Chris@16 313 // && !defined(BOOST_ASIO_WINDOWS_RUNTIME))
Chris@16 314
Chris@16 315 #endif // BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP