annotate DEPENDENCIES/generic/include/boost/asio/detail/impl/task_io_service.ipp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 //
Chris@16 2 // detail/impl/task_io_service.ipp
Chris@16 3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Chris@16 4 //
Chris@101 5 // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
Chris@16 6 //
Chris@16 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
Chris@16 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Chris@16 9 //
Chris@16 10
Chris@16 11 #ifndef BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
Chris@16 12 #define BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
Chris@16 13
Chris@16 14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
Chris@16 15 # pragma once
Chris@16 16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
Chris@16 17
Chris@16 18 #include <boost/asio/detail/config.hpp>
Chris@16 19
Chris@16 20 #if !defined(BOOST_ASIO_HAS_IOCP)
Chris@16 21
Chris@16 22 #include <boost/asio/detail/event.hpp>
Chris@16 23 #include <boost/asio/detail/limits.hpp>
Chris@16 24 #include <boost/asio/detail/reactor.hpp>
Chris@16 25 #include <boost/asio/detail/task_io_service.hpp>
Chris@16 26 #include <boost/asio/detail/task_io_service_thread_info.hpp>
Chris@16 27
Chris@16 28 #include <boost/asio/detail/push_options.hpp>
Chris@16 29
Chris@16 30 namespace boost {
Chris@16 31 namespace asio {
Chris@16 32 namespace detail {
Chris@16 33
Chris@16 34 struct task_io_service::task_cleanup
Chris@16 35 {
Chris@16 36 ~task_cleanup()
Chris@16 37 {
Chris@16 38 if (this_thread_->private_outstanding_work > 0)
Chris@16 39 {
Chris@16 40 boost::asio::detail::increment(
Chris@16 41 task_io_service_->outstanding_work_,
Chris@16 42 this_thread_->private_outstanding_work);
Chris@16 43 }
Chris@16 44 this_thread_->private_outstanding_work = 0;
Chris@16 45
Chris@16 46 // Enqueue the completed operations and reinsert the task at the end of
Chris@16 47 // the operation queue.
Chris@16 48 lock_->lock();
Chris@16 49 task_io_service_->task_interrupted_ = true;
Chris@16 50 task_io_service_->op_queue_.push(this_thread_->private_op_queue);
Chris@16 51 task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
Chris@16 52 }
Chris@16 53
Chris@16 54 task_io_service* task_io_service_;
Chris@16 55 mutex::scoped_lock* lock_;
Chris@16 56 thread_info* this_thread_;
Chris@16 57 };
Chris@16 58
Chris@16 59 struct task_io_service::work_cleanup
Chris@16 60 {
Chris@16 61 ~work_cleanup()
Chris@16 62 {
Chris@16 63 if (this_thread_->private_outstanding_work > 1)
Chris@16 64 {
Chris@16 65 boost::asio::detail::increment(
Chris@16 66 task_io_service_->outstanding_work_,
Chris@16 67 this_thread_->private_outstanding_work - 1);
Chris@16 68 }
Chris@16 69 else if (this_thread_->private_outstanding_work < 1)
Chris@16 70 {
Chris@16 71 task_io_service_->work_finished();
Chris@16 72 }
Chris@16 73 this_thread_->private_outstanding_work = 0;
Chris@16 74
Chris@16 75 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 76 if (!this_thread_->private_op_queue.empty())
Chris@16 77 {
Chris@16 78 lock_->lock();
Chris@16 79 task_io_service_->op_queue_.push(this_thread_->private_op_queue);
Chris@16 80 }
Chris@16 81 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 82 }
Chris@16 83
Chris@16 84 task_io_service* task_io_service_;
Chris@16 85 mutex::scoped_lock* lock_;
Chris@16 86 thread_info* this_thread_;
Chris@16 87 };
Chris@16 88
Chris@16 89 task_io_service::task_io_service(
Chris@16 90 boost::asio::io_service& io_service, std::size_t concurrency_hint)
Chris@16 91 : boost::asio::detail::service_base<task_io_service>(io_service),
Chris@16 92 one_thread_(concurrency_hint == 1),
Chris@16 93 mutex_(),
Chris@16 94 task_(0),
Chris@16 95 task_interrupted_(true),
Chris@16 96 outstanding_work_(0),
Chris@16 97 stopped_(false),
Chris@101 98 shutdown_(false)
Chris@16 99 {
Chris@16 100 BOOST_ASIO_HANDLER_TRACKING_INIT;
Chris@16 101 }
Chris@16 102
Chris@16 103 void task_io_service::shutdown_service()
Chris@16 104 {
Chris@16 105 mutex::scoped_lock lock(mutex_);
Chris@16 106 shutdown_ = true;
Chris@16 107 lock.unlock();
Chris@16 108
Chris@16 109 // Destroy handler objects.
Chris@16 110 while (!op_queue_.empty())
Chris@16 111 {
Chris@16 112 operation* o = op_queue_.front();
Chris@16 113 op_queue_.pop();
Chris@16 114 if (o != &task_operation_)
Chris@16 115 o->destroy();
Chris@16 116 }
Chris@16 117
Chris@16 118 // Reset to initial state.
Chris@16 119 task_ = 0;
Chris@16 120 }
Chris@16 121
Chris@16 122 void task_io_service::init_task()
Chris@16 123 {
Chris@16 124 mutex::scoped_lock lock(mutex_);
Chris@16 125 if (!shutdown_ && !task_)
Chris@16 126 {
Chris@16 127 task_ = &use_service<reactor>(this->get_io_service());
Chris@16 128 op_queue_.push(&task_operation_);
Chris@16 129 wake_one_thread_and_unlock(lock);
Chris@16 130 }
Chris@16 131 }
Chris@16 132
Chris@16 133 std::size_t task_io_service::run(boost::system::error_code& ec)
Chris@16 134 {
Chris@16 135 ec = boost::system::error_code();
Chris@16 136 if (outstanding_work_ == 0)
Chris@16 137 {
Chris@16 138 stop();
Chris@16 139 return 0;
Chris@16 140 }
Chris@16 141
Chris@16 142 thread_info this_thread;
Chris@16 143 this_thread.private_outstanding_work = 0;
Chris@16 144 thread_call_stack::context ctx(this, this_thread);
Chris@16 145
Chris@16 146 mutex::scoped_lock lock(mutex_);
Chris@16 147
Chris@16 148 std::size_t n = 0;
Chris@16 149 for (; do_run_one(lock, this_thread, ec); lock.lock())
Chris@16 150 if (n != (std::numeric_limits<std::size_t>::max)())
Chris@16 151 ++n;
Chris@16 152 return n;
Chris@16 153 }
Chris@16 154
Chris@16 155 std::size_t task_io_service::run_one(boost::system::error_code& ec)
Chris@16 156 {
Chris@16 157 ec = boost::system::error_code();
Chris@16 158 if (outstanding_work_ == 0)
Chris@16 159 {
Chris@16 160 stop();
Chris@16 161 return 0;
Chris@16 162 }
Chris@16 163
Chris@16 164 thread_info this_thread;
Chris@16 165 this_thread.private_outstanding_work = 0;
Chris@16 166 thread_call_stack::context ctx(this, this_thread);
Chris@16 167
Chris@16 168 mutex::scoped_lock lock(mutex_);
Chris@16 169
Chris@16 170 return do_run_one(lock, this_thread, ec);
Chris@16 171 }
Chris@16 172
Chris@16 173 std::size_t task_io_service::poll(boost::system::error_code& ec)
Chris@16 174 {
Chris@16 175 ec = boost::system::error_code();
Chris@16 176 if (outstanding_work_ == 0)
Chris@16 177 {
Chris@16 178 stop();
Chris@16 179 return 0;
Chris@16 180 }
Chris@16 181
Chris@16 182 thread_info this_thread;
Chris@16 183 this_thread.private_outstanding_work = 0;
Chris@16 184 thread_call_stack::context ctx(this, this_thread);
Chris@16 185
Chris@16 186 mutex::scoped_lock lock(mutex_);
Chris@16 187
Chris@16 188 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 189 // We want to support nested calls to poll() and poll_one(), so any handlers
Chris@16 190 // that are already on a thread-private queue need to be put on to the main
Chris@16 191 // queue now.
Chris@16 192 if (one_thread_)
Chris@16 193 if (thread_info* outer_thread_info = ctx.next_by_key())
Chris@16 194 op_queue_.push(outer_thread_info->private_op_queue);
Chris@16 195 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 196
Chris@16 197 std::size_t n = 0;
Chris@16 198 for (; do_poll_one(lock, this_thread, ec); lock.lock())
Chris@16 199 if (n != (std::numeric_limits<std::size_t>::max)())
Chris@16 200 ++n;
Chris@16 201 return n;
Chris@16 202 }
Chris@16 203
Chris@16 204 std::size_t task_io_service::poll_one(boost::system::error_code& ec)
Chris@16 205 {
Chris@16 206 ec = boost::system::error_code();
Chris@16 207 if (outstanding_work_ == 0)
Chris@16 208 {
Chris@16 209 stop();
Chris@16 210 return 0;
Chris@16 211 }
Chris@16 212
Chris@16 213 thread_info this_thread;
Chris@16 214 this_thread.private_outstanding_work = 0;
Chris@16 215 thread_call_stack::context ctx(this, this_thread);
Chris@16 216
Chris@16 217 mutex::scoped_lock lock(mutex_);
Chris@16 218
Chris@16 219 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 220 // We want to support nested calls to poll() and poll_one(), so any handlers
Chris@16 221 // that are already on a thread-private queue need to be put on to the main
Chris@16 222 // queue now.
Chris@16 223 if (one_thread_)
Chris@16 224 if (thread_info* outer_thread_info = ctx.next_by_key())
Chris@16 225 op_queue_.push(outer_thread_info->private_op_queue);
Chris@16 226 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 227
Chris@16 228 return do_poll_one(lock, this_thread, ec);
Chris@16 229 }
Chris@16 230
Chris@16 231 void task_io_service::stop()
Chris@16 232 {
Chris@16 233 mutex::scoped_lock lock(mutex_);
Chris@16 234 stop_all_threads(lock);
Chris@16 235 }
Chris@16 236
Chris@16 237 bool task_io_service::stopped() const
Chris@16 238 {
Chris@16 239 mutex::scoped_lock lock(mutex_);
Chris@16 240 return stopped_;
Chris@16 241 }
Chris@16 242
Chris@16 243 void task_io_service::reset()
Chris@16 244 {
Chris@16 245 mutex::scoped_lock lock(mutex_);
Chris@16 246 stopped_ = false;
Chris@16 247 }
Chris@16 248
Chris@16 249 void task_io_service::post_immediate_completion(
Chris@16 250 task_io_service::operation* op, bool is_continuation)
Chris@16 251 {
Chris@16 252 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 253 if (one_thread_ || is_continuation)
Chris@16 254 {
Chris@16 255 if (thread_info* this_thread = thread_call_stack::contains(this))
Chris@16 256 {
Chris@16 257 ++this_thread->private_outstanding_work;
Chris@16 258 this_thread->private_op_queue.push(op);
Chris@16 259 return;
Chris@16 260 }
Chris@16 261 }
Chris@101 262 #else // defined(BOOST_ASIO_HAS_THREADS)
Chris@101 263 (void)is_continuation;
Chris@16 264 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 265
Chris@16 266 work_started();
Chris@16 267 mutex::scoped_lock lock(mutex_);
Chris@16 268 op_queue_.push(op);
Chris@16 269 wake_one_thread_and_unlock(lock);
Chris@16 270 }
Chris@16 271
Chris@16 272 void task_io_service::post_deferred_completion(task_io_service::operation* op)
Chris@16 273 {
Chris@16 274 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 275 if (one_thread_)
Chris@16 276 {
Chris@16 277 if (thread_info* this_thread = thread_call_stack::contains(this))
Chris@16 278 {
Chris@16 279 this_thread->private_op_queue.push(op);
Chris@16 280 return;
Chris@16 281 }
Chris@16 282 }
Chris@16 283 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 284
Chris@16 285 mutex::scoped_lock lock(mutex_);
Chris@16 286 op_queue_.push(op);
Chris@16 287 wake_one_thread_and_unlock(lock);
Chris@16 288 }
Chris@16 289
Chris@16 290 void task_io_service::post_deferred_completions(
Chris@16 291 op_queue<task_io_service::operation>& ops)
Chris@16 292 {
Chris@16 293 if (!ops.empty())
Chris@16 294 {
Chris@16 295 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 296 if (one_thread_)
Chris@16 297 {
Chris@16 298 if (thread_info* this_thread = thread_call_stack::contains(this))
Chris@16 299 {
Chris@16 300 this_thread->private_op_queue.push(ops);
Chris@16 301 return;
Chris@16 302 }
Chris@16 303 }
Chris@16 304 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 305
Chris@16 306 mutex::scoped_lock lock(mutex_);
Chris@16 307 op_queue_.push(ops);
Chris@16 308 wake_one_thread_and_unlock(lock);
Chris@16 309 }
Chris@16 310 }
Chris@16 311
Chris@16 312 void task_io_service::do_dispatch(
Chris@16 313 task_io_service::operation* op)
Chris@16 314 {
Chris@16 315 work_started();
Chris@16 316 mutex::scoped_lock lock(mutex_);
Chris@16 317 op_queue_.push(op);
Chris@16 318 wake_one_thread_and_unlock(lock);
Chris@16 319 }
Chris@16 320
Chris@16 321 void task_io_service::abandon_operations(
Chris@16 322 op_queue<task_io_service::operation>& ops)
Chris@16 323 {
Chris@16 324 op_queue<task_io_service::operation> ops2;
Chris@16 325 ops2.push(ops);
Chris@16 326 }
Chris@16 327
Chris@16 328 std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
Chris@16 329 task_io_service::thread_info& this_thread,
Chris@16 330 const boost::system::error_code& ec)
Chris@16 331 {
Chris@16 332 while (!stopped_)
Chris@16 333 {
Chris@16 334 if (!op_queue_.empty())
Chris@16 335 {
Chris@16 336 // Prepare to execute first handler from queue.
Chris@16 337 operation* o = op_queue_.front();
Chris@16 338 op_queue_.pop();
Chris@16 339 bool more_handlers = (!op_queue_.empty());
Chris@16 340
Chris@16 341 if (o == &task_operation_)
Chris@16 342 {
Chris@16 343 task_interrupted_ = more_handlers;
Chris@16 344
Chris@16 345 if (more_handlers && !one_thread_)
Chris@101 346 wakeup_event_.unlock_and_signal_one(lock);
Chris@16 347 else
Chris@16 348 lock.unlock();
Chris@16 349
Chris@16 350 task_cleanup on_exit = { this, &lock, &this_thread };
Chris@16 351 (void)on_exit;
Chris@16 352
Chris@16 353 // Run the task. May throw an exception. Only block if the operation
Chris@16 354 // queue is empty and we're not polling, otherwise we want to return
Chris@16 355 // as soon as possible.
Chris@16 356 task_->run(!more_handlers, this_thread.private_op_queue);
Chris@16 357 }
Chris@16 358 else
Chris@16 359 {
Chris@16 360 std::size_t task_result = o->task_result_;
Chris@16 361
Chris@16 362 if (more_handlers && !one_thread_)
Chris@16 363 wake_one_thread_and_unlock(lock);
Chris@16 364 else
Chris@16 365 lock.unlock();
Chris@16 366
Chris@16 367 // Ensure the count of outstanding work is decremented on block exit.
Chris@16 368 work_cleanup on_exit = { this, &lock, &this_thread };
Chris@16 369 (void)on_exit;
Chris@16 370
Chris@16 371 // Complete the operation. May throw an exception. Deletes the object.
Chris@16 372 o->complete(*this, ec, task_result);
Chris@16 373
Chris@16 374 return 1;
Chris@16 375 }
Chris@16 376 }
Chris@16 377 else
Chris@16 378 {
Chris@101 379 wakeup_event_.clear(lock);
Chris@101 380 wakeup_event_.wait(lock);
Chris@16 381 }
Chris@16 382 }
Chris@16 383
Chris@16 384 return 0;
Chris@16 385 }
Chris@16 386
Chris@16 387 std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
Chris@16 388 task_io_service::thread_info& this_thread,
Chris@16 389 const boost::system::error_code& ec)
Chris@16 390 {
Chris@16 391 if (stopped_)
Chris@16 392 return 0;
Chris@16 393
Chris@16 394 operation* o = op_queue_.front();
Chris@16 395 if (o == &task_operation_)
Chris@16 396 {
Chris@16 397 op_queue_.pop();
Chris@16 398 lock.unlock();
Chris@16 399
Chris@16 400 {
Chris@16 401 task_cleanup c = { this, &lock, &this_thread };
Chris@16 402 (void)c;
Chris@16 403
Chris@16 404 // Run the task. May throw an exception. Only block if the operation
Chris@16 405 // queue is empty and we're not polling, otherwise we want to return
Chris@16 406 // as soon as possible.
Chris@16 407 task_->run(false, this_thread.private_op_queue);
Chris@16 408 }
Chris@16 409
Chris@16 410 o = op_queue_.front();
Chris@16 411 if (o == &task_operation_)
Chris@16 412 {
Chris@101 413 wakeup_event_.maybe_unlock_and_signal_one(lock);
Chris@16 414 return 0;
Chris@16 415 }
Chris@16 416 }
Chris@16 417
Chris@16 418 if (o == 0)
Chris@16 419 return 0;
Chris@16 420
Chris@16 421 op_queue_.pop();
Chris@16 422 bool more_handlers = (!op_queue_.empty());
Chris@16 423
Chris@16 424 std::size_t task_result = o->task_result_;
Chris@16 425
Chris@16 426 if (more_handlers && !one_thread_)
Chris@16 427 wake_one_thread_and_unlock(lock);
Chris@16 428 else
Chris@16 429 lock.unlock();
Chris@16 430
Chris@16 431 // Ensure the count of outstanding work is decremented on block exit.
Chris@16 432 work_cleanup on_exit = { this, &lock, &this_thread };
Chris@16 433 (void)on_exit;
Chris@16 434
Chris@16 435 // Complete the operation. May throw an exception. Deletes the object.
Chris@16 436 o->complete(*this, ec, task_result);
Chris@16 437
Chris@16 438 return 1;
Chris@16 439 }
Chris@16 440
Chris@16 441 void task_io_service::stop_all_threads(
Chris@16 442 mutex::scoped_lock& lock)
Chris@16 443 {
Chris@16 444 stopped_ = true;
Chris@101 445 wakeup_event_.signal_all(lock);
Chris@16 446
Chris@16 447 if (!task_interrupted_ && task_)
Chris@16 448 {
Chris@16 449 task_interrupted_ = true;
Chris@16 450 task_->interrupt();
Chris@16 451 }
Chris@16 452 }
Chris@16 453
Chris@16 454 void task_io_service::wake_one_thread_and_unlock(
Chris@16 455 mutex::scoped_lock& lock)
Chris@16 456 {
Chris@101 457 if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
Chris@16 458 {
Chris@16 459 if (!task_interrupted_ && task_)
Chris@16 460 {
Chris@16 461 task_interrupted_ = true;
Chris@16 462 task_->interrupt();
Chris@16 463 }
Chris@16 464 lock.unlock();
Chris@16 465 }
Chris@16 466 }
Chris@16 467
Chris@16 468 } // namespace detail
Chris@16 469 } // namespace asio
Chris@16 470 } // namespace boost
Chris@16 471
Chris@16 472 #include <boost/asio/detail/pop_options.hpp>
Chris@16 473
Chris@16 474 #endif // !defined(BOOST_ASIO_HAS_IOCP)
Chris@16 475
Chris@16 476 #endif // BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP