annotate DEPENDENCIES/generic/include/boost/asio/detail/impl/task_io_service.ipp @ 46:d572322e2efe

Fix to .cat file check (was susceptible to DOS line-endings) and subrepo update
author Chris Cannam
date Thu, 07 Aug 2014 14:39:38 +0100
parents 2665513ce2d3
children c530137014c0
rev   line source
Chris@16 1 //
Chris@16 2 // detail/impl/task_io_service.ipp
Chris@16 3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Chris@16 4 //
Chris@16 5 // Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
Chris@16 6 //
Chris@16 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
Chris@16 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Chris@16 9 //
Chris@16 10
Chris@16 11 #ifndef BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
Chris@16 12 #define BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
Chris@16 13
Chris@16 14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
Chris@16 15 # pragma once
Chris@16 16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
Chris@16 17
Chris@16 18 #include <boost/asio/detail/config.hpp>
Chris@16 19
Chris@16 20 #if !defined(BOOST_ASIO_HAS_IOCP)
Chris@16 21
Chris@16 22 #include <boost/asio/detail/event.hpp>
Chris@16 23 #include <boost/asio/detail/limits.hpp>
Chris@16 24 #include <boost/asio/detail/reactor.hpp>
Chris@16 25 #include <boost/asio/detail/task_io_service.hpp>
Chris@16 26 #include <boost/asio/detail/task_io_service_thread_info.hpp>
Chris@16 27
Chris@16 28 #include <boost/asio/detail/push_options.hpp>
Chris@16 29
Chris@16 30 namespace boost {
Chris@16 31 namespace asio {
Chris@16 32 namespace detail {
Chris@16 33
Chris@16 34 struct task_io_service::task_cleanup
Chris@16 35 {
Chris@16 36 ~task_cleanup()
Chris@16 37 {
Chris@16 38 if (this_thread_->private_outstanding_work > 0)
Chris@16 39 {
Chris@16 40 boost::asio::detail::increment(
Chris@16 41 task_io_service_->outstanding_work_,
Chris@16 42 this_thread_->private_outstanding_work);
Chris@16 43 }
Chris@16 44 this_thread_->private_outstanding_work = 0;
Chris@16 45
Chris@16 46 // Enqueue the completed operations and reinsert the task at the end of
Chris@16 47 // the operation queue.
Chris@16 48 lock_->lock();
Chris@16 49 task_io_service_->task_interrupted_ = true;
Chris@16 50 task_io_service_->op_queue_.push(this_thread_->private_op_queue);
Chris@16 51 task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
Chris@16 52 }
Chris@16 53
Chris@16 54 task_io_service* task_io_service_;
Chris@16 55 mutex::scoped_lock* lock_;
Chris@16 56 thread_info* this_thread_;
Chris@16 57 };
Chris@16 58
Chris@16 59 struct task_io_service::work_cleanup
Chris@16 60 {
Chris@16 61 ~work_cleanup()
Chris@16 62 {
Chris@16 63 if (this_thread_->private_outstanding_work > 1)
Chris@16 64 {
Chris@16 65 boost::asio::detail::increment(
Chris@16 66 task_io_service_->outstanding_work_,
Chris@16 67 this_thread_->private_outstanding_work - 1);
Chris@16 68 }
Chris@16 69 else if (this_thread_->private_outstanding_work < 1)
Chris@16 70 {
Chris@16 71 task_io_service_->work_finished();
Chris@16 72 }
Chris@16 73 this_thread_->private_outstanding_work = 0;
Chris@16 74
Chris@16 75 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 76 if (!this_thread_->private_op_queue.empty())
Chris@16 77 {
Chris@16 78 lock_->lock();
Chris@16 79 task_io_service_->op_queue_.push(this_thread_->private_op_queue);
Chris@16 80 }
Chris@16 81 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 82 }
Chris@16 83
Chris@16 84 task_io_service* task_io_service_;
Chris@16 85 mutex::scoped_lock* lock_;
Chris@16 86 thread_info* this_thread_;
Chris@16 87 };
Chris@16 88
Chris@16 89 task_io_service::task_io_service(
Chris@16 90 boost::asio::io_service& io_service, std::size_t concurrency_hint)
Chris@16 91 : boost::asio::detail::service_base<task_io_service>(io_service),
Chris@16 92 one_thread_(concurrency_hint == 1),
Chris@16 93 mutex_(),
Chris@16 94 task_(0),
Chris@16 95 task_interrupted_(true),
Chris@16 96 outstanding_work_(0),
Chris@16 97 stopped_(false),
Chris@16 98 shutdown_(false),
Chris@16 99 first_idle_thread_(0)
Chris@16 100 {
Chris@16 101 BOOST_ASIO_HANDLER_TRACKING_INIT;
Chris@16 102 }
Chris@16 103
Chris@16 104 void task_io_service::shutdown_service()
Chris@16 105 {
Chris@16 106 mutex::scoped_lock lock(mutex_);
Chris@16 107 shutdown_ = true;
Chris@16 108 lock.unlock();
Chris@16 109
Chris@16 110 // Destroy handler objects.
Chris@16 111 while (!op_queue_.empty())
Chris@16 112 {
Chris@16 113 operation* o = op_queue_.front();
Chris@16 114 op_queue_.pop();
Chris@16 115 if (o != &task_operation_)
Chris@16 116 o->destroy();
Chris@16 117 }
Chris@16 118
Chris@16 119 // Reset to initial state.
Chris@16 120 task_ = 0;
Chris@16 121 }
Chris@16 122
Chris@16 123 void task_io_service::init_task()
Chris@16 124 {
Chris@16 125 mutex::scoped_lock lock(mutex_);
Chris@16 126 if (!shutdown_ && !task_)
Chris@16 127 {
Chris@16 128 task_ = &use_service<reactor>(this->get_io_service());
Chris@16 129 op_queue_.push(&task_operation_);
Chris@16 130 wake_one_thread_and_unlock(lock);
Chris@16 131 }
Chris@16 132 }
Chris@16 133
Chris@16 134 std::size_t task_io_service::run(boost::system::error_code& ec)
Chris@16 135 {
Chris@16 136 ec = boost::system::error_code();
Chris@16 137 if (outstanding_work_ == 0)
Chris@16 138 {
Chris@16 139 stop();
Chris@16 140 return 0;
Chris@16 141 }
Chris@16 142
Chris@16 143 thread_info this_thread;
Chris@16 144 event wakeup_event;
Chris@16 145 this_thread.wakeup_event = &wakeup_event;
Chris@16 146 this_thread.private_outstanding_work = 0;
Chris@16 147 this_thread.next = 0;
Chris@16 148 thread_call_stack::context ctx(this, this_thread);
Chris@16 149
Chris@16 150 mutex::scoped_lock lock(mutex_);
Chris@16 151
Chris@16 152 std::size_t n = 0;
Chris@16 153 for (; do_run_one(lock, this_thread, ec); lock.lock())
Chris@16 154 if (n != (std::numeric_limits<std::size_t>::max)())
Chris@16 155 ++n;
Chris@16 156 return n;
Chris@16 157 }
Chris@16 158
Chris@16 159 std::size_t task_io_service::run_one(boost::system::error_code& ec)
Chris@16 160 {
Chris@16 161 ec = boost::system::error_code();
Chris@16 162 if (outstanding_work_ == 0)
Chris@16 163 {
Chris@16 164 stop();
Chris@16 165 return 0;
Chris@16 166 }
Chris@16 167
Chris@16 168 thread_info this_thread;
Chris@16 169 event wakeup_event;
Chris@16 170 this_thread.wakeup_event = &wakeup_event;
Chris@16 171 this_thread.private_outstanding_work = 0;
Chris@16 172 this_thread.next = 0;
Chris@16 173 thread_call_stack::context ctx(this, this_thread);
Chris@16 174
Chris@16 175 mutex::scoped_lock lock(mutex_);
Chris@16 176
Chris@16 177 return do_run_one(lock, this_thread, ec);
Chris@16 178 }
Chris@16 179
Chris@16 180 std::size_t task_io_service::poll(boost::system::error_code& ec)
Chris@16 181 {
Chris@16 182 ec = boost::system::error_code();
Chris@16 183 if (outstanding_work_ == 0)
Chris@16 184 {
Chris@16 185 stop();
Chris@16 186 return 0;
Chris@16 187 }
Chris@16 188
Chris@16 189 thread_info this_thread;
Chris@16 190 this_thread.wakeup_event = 0;
Chris@16 191 this_thread.private_outstanding_work = 0;
Chris@16 192 this_thread.next = 0;
Chris@16 193 thread_call_stack::context ctx(this, this_thread);
Chris@16 194
Chris@16 195 mutex::scoped_lock lock(mutex_);
Chris@16 196
Chris@16 197 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 198 // We want to support nested calls to poll() and poll_one(), so any handlers
Chris@16 199 // that are already on a thread-private queue need to be put on to the main
Chris@16 200 // queue now.
Chris@16 201 if (one_thread_)
Chris@16 202 if (thread_info* outer_thread_info = ctx.next_by_key())
Chris@16 203 op_queue_.push(outer_thread_info->private_op_queue);
Chris@16 204 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 205
Chris@16 206 std::size_t n = 0;
Chris@16 207 for (; do_poll_one(lock, this_thread, ec); lock.lock())
Chris@16 208 if (n != (std::numeric_limits<std::size_t>::max)())
Chris@16 209 ++n;
Chris@16 210 return n;
Chris@16 211 }
Chris@16 212
Chris@16 213 std::size_t task_io_service::poll_one(boost::system::error_code& ec)
Chris@16 214 {
Chris@16 215 ec = boost::system::error_code();
Chris@16 216 if (outstanding_work_ == 0)
Chris@16 217 {
Chris@16 218 stop();
Chris@16 219 return 0;
Chris@16 220 }
Chris@16 221
Chris@16 222 thread_info this_thread;
Chris@16 223 this_thread.wakeup_event = 0;
Chris@16 224 this_thread.private_outstanding_work = 0;
Chris@16 225 this_thread.next = 0;
Chris@16 226 thread_call_stack::context ctx(this, this_thread);
Chris@16 227
Chris@16 228 mutex::scoped_lock lock(mutex_);
Chris@16 229
Chris@16 230 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 231 // We want to support nested calls to poll() and poll_one(), so any handlers
Chris@16 232 // that are already on a thread-private queue need to be put on to the main
Chris@16 233 // queue now.
Chris@16 234 if (one_thread_)
Chris@16 235 if (thread_info* outer_thread_info = ctx.next_by_key())
Chris@16 236 op_queue_.push(outer_thread_info->private_op_queue);
Chris@16 237 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 238
Chris@16 239 return do_poll_one(lock, this_thread, ec);
Chris@16 240 }
Chris@16 241
Chris@16 242 void task_io_service::stop()
Chris@16 243 {
Chris@16 244 mutex::scoped_lock lock(mutex_);
Chris@16 245 stop_all_threads(lock);
Chris@16 246 }
Chris@16 247
Chris@16 248 bool task_io_service::stopped() const
Chris@16 249 {
Chris@16 250 mutex::scoped_lock lock(mutex_);
Chris@16 251 return stopped_;
Chris@16 252 }
Chris@16 253
Chris@16 254 void task_io_service::reset()
Chris@16 255 {
Chris@16 256 mutex::scoped_lock lock(mutex_);
Chris@16 257 stopped_ = false;
Chris@16 258 }
Chris@16 259
Chris@16 260 void task_io_service::post_immediate_completion(
Chris@16 261 task_io_service::operation* op, bool is_continuation)
Chris@16 262 {
Chris@16 263 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 264 if (one_thread_ || is_continuation)
Chris@16 265 {
Chris@16 266 if (thread_info* this_thread = thread_call_stack::contains(this))
Chris@16 267 {
Chris@16 268 ++this_thread->private_outstanding_work;
Chris@16 269 this_thread->private_op_queue.push(op);
Chris@16 270 return;
Chris@16 271 }
Chris@16 272 }
Chris@16 273 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 274
Chris@16 275 work_started();
Chris@16 276 mutex::scoped_lock lock(mutex_);
Chris@16 277 op_queue_.push(op);
Chris@16 278 wake_one_thread_and_unlock(lock);
Chris@16 279 }
Chris@16 280
Chris@16 281 void task_io_service::post_deferred_completion(task_io_service::operation* op)
Chris@16 282 {
Chris@16 283 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 284 if (one_thread_)
Chris@16 285 {
Chris@16 286 if (thread_info* this_thread = thread_call_stack::contains(this))
Chris@16 287 {
Chris@16 288 this_thread->private_op_queue.push(op);
Chris@16 289 return;
Chris@16 290 }
Chris@16 291 }
Chris@16 292 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 293
Chris@16 294 mutex::scoped_lock lock(mutex_);
Chris@16 295 op_queue_.push(op);
Chris@16 296 wake_one_thread_and_unlock(lock);
Chris@16 297 }
Chris@16 298
Chris@16 299 void task_io_service::post_deferred_completions(
Chris@16 300 op_queue<task_io_service::operation>& ops)
Chris@16 301 {
Chris@16 302 if (!ops.empty())
Chris@16 303 {
Chris@16 304 #if defined(BOOST_ASIO_HAS_THREADS)
Chris@16 305 if (one_thread_)
Chris@16 306 {
Chris@16 307 if (thread_info* this_thread = thread_call_stack::contains(this))
Chris@16 308 {
Chris@16 309 this_thread->private_op_queue.push(ops);
Chris@16 310 return;
Chris@16 311 }
Chris@16 312 }
Chris@16 313 #endif // defined(BOOST_ASIO_HAS_THREADS)
Chris@16 314
Chris@16 315 mutex::scoped_lock lock(mutex_);
Chris@16 316 op_queue_.push(ops);
Chris@16 317 wake_one_thread_and_unlock(lock);
Chris@16 318 }
Chris@16 319 }
Chris@16 320
Chris@16 321 void task_io_service::do_dispatch(
Chris@16 322 task_io_service::operation* op)
Chris@16 323 {
Chris@16 324 work_started();
Chris@16 325 mutex::scoped_lock lock(mutex_);
Chris@16 326 op_queue_.push(op);
Chris@16 327 wake_one_thread_and_unlock(lock);
Chris@16 328 }
Chris@16 329
Chris@16 330 void task_io_service::abandon_operations(
Chris@16 331 op_queue<task_io_service::operation>& ops)
Chris@16 332 {
Chris@16 333 op_queue<task_io_service::operation> ops2;
Chris@16 334 ops2.push(ops);
Chris@16 335 }
Chris@16 336
Chris@16 337 std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
Chris@16 338 task_io_service::thread_info& this_thread,
Chris@16 339 const boost::system::error_code& ec)
Chris@16 340 {
Chris@16 341 while (!stopped_)
Chris@16 342 {
Chris@16 343 if (!op_queue_.empty())
Chris@16 344 {
Chris@16 345 // Prepare to execute first handler from queue.
Chris@16 346 operation* o = op_queue_.front();
Chris@16 347 op_queue_.pop();
Chris@16 348 bool more_handlers = (!op_queue_.empty());
Chris@16 349
Chris@16 350 if (o == &task_operation_)
Chris@16 351 {
Chris@16 352 task_interrupted_ = more_handlers;
Chris@16 353
Chris@16 354 if (more_handlers && !one_thread_)
Chris@16 355 {
Chris@16 356 if (!wake_one_idle_thread_and_unlock(lock))
Chris@16 357 lock.unlock();
Chris@16 358 }
Chris@16 359 else
Chris@16 360 lock.unlock();
Chris@16 361
Chris@16 362 task_cleanup on_exit = { this, &lock, &this_thread };
Chris@16 363 (void)on_exit;
Chris@16 364
Chris@16 365 // Run the task. May throw an exception. Only block if the operation
Chris@16 366 // queue is empty and we're not polling, otherwise we want to return
Chris@16 367 // as soon as possible.
Chris@16 368 task_->run(!more_handlers, this_thread.private_op_queue);
Chris@16 369 }
Chris@16 370 else
Chris@16 371 {
Chris@16 372 std::size_t task_result = o->task_result_;
Chris@16 373
Chris@16 374 if (more_handlers && !one_thread_)
Chris@16 375 wake_one_thread_and_unlock(lock);
Chris@16 376 else
Chris@16 377 lock.unlock();
Chris@16 378
Chris@16 379 // Ensure the count of outstanding work is decremented on block exit.
Chris@16 380 work_cleanup on_exit = { this, &lock, &this_thread };
Chris@16 381 (void)on_exit;
Chris@16 382
Chris@16 383 // Complete the operation. May throw an exception. Deletes the object.
Chris@16 384 o->complete(*this, ec, task_result);
Chris@16 385
Chris@16 386 return 1;
Chris@16 387 }
Chris@16 388 }
Chris@16 389 else
Chris@16 390 {
Chris@16 391 // Nothing to run right now, so just wait for work to do.
Chris@16 392 this_thread.next = first_idle_thread_;
Chris@16 393 first_idle_thread_ = &this_thread;
Chris@16 394 this_thread.wakeup_event->clear(lock);
Chris@16 395 this_thread.wakeup_event->wait(lock);
Chris@16 396 }
Chris@16 397 }
Chris@16 398
Chris@16 399 return 0;
Chris@16 400 }
Chris@16 401
Chris@16 402 std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
Chris@16 403 task_io_service::thread_info& this_thread,
Chris@16 404 const boost::system::error_code& ec)
Chris@16 405 {
Chris@16 406 if (stopped_)
Chris@16 407 return 0;
Chris@16 408
Chris@16 409 operation* o = op_queue_.front();
Chris@16 410 if (o == &task_operation_)
Chris@16 411 {
Chris@16 412 op_queue_.pop();
Chris@16 413 lock.unlock();
Chris@16 414
Chris@16 415 {
Chris@16 416 task_cleanup c = { this, &lock, &this_thread };
Chris@16 417 (void)c;
Chris@16 418
Chris@16 419 // Run the task. May throw an exception. Only block if the operation
Chris@16 420 // queue is empty and we're not polling, otherwise we want to return
Chris@16 421 // as soon as possible.
Chris@16 422 task_->run(false, this_thread.private_op_queue);
Chris@16 423 }
Chris@16 424
Chris@16 425 o = op_queue_.front();
Chris@16 426 if (o == &task_operation_)
Chris@16 427 {
Chris@16 428 wake_one_idle_thread_and_unlock(lock);
Chris@16 429 return 0;
Chris@16 430 }
Chris@16 431 }
Chris@16 432
Chris@16 433 if (o == 0)
Chris@16 434 return 0;
Chris@16 435
Chris@16 436 op_queue_.pop();
Chris@16 437 bool more_handlers = (!op_queue_.empty());
Chris@16 438
Chris@16 439 std::size_t task_result = o->task_result_;
Chris@16 440
Chris@16 441 if (more_handlers && !one_thread_)
Chris@16 442 wake_one_thread_and_unlock(lock);
Chris@16 443 else
Chris@16 444 lock.unlock();
Chris@16 445
Chris@16 446 // Ensure the count of outstanding work is decremented on block exit.
Chris@16 447 work_cleanup on_exit = { this, &lock, &this_thread };
Chris@16 448 (void)on_exit;
Chris@16 449
Chris@16 450 // Complete the operation. May throw an exception. Deletes the object.
Chris@16 451 o->complete(*this, ec, task_result);
Chris@16 452
Chris@16 453 return 1;
Chris@16 454 }
Chris@16 455
Chris@16 456 void task_io_service::stop_all_threads(
Chris@16 457 mutex::scoped_lock& lock)
Chris@16 458 {
Chris@16 459 stopped_ = true;
Chris@16 460
Chris@16 461 while (first_idle_thread_)
Chris@16 462 {
Chris@16 463 thread_info* idle_thread = first_idle_thread_;
Chris@16 464 first_idle_thread_ = idle_thread->next;
Chris@16 465 idle_thread->next = 0;
Chris@16 466 idle_thread->wakeup_event->signal(lock);
Chris@16 467 }
Chris@16 468
Chris@16 469 if (!task_interrupted_ && task_)
Chris@16 470 {
Chris@16 471 task_interrupted_ = true;
Chris@16 472 task_->interrupt();
Chris@16 473 }
Chris@16 474 }
Chris@16 475
Chris@16 476 bool task_io_service::wake_one_idle_thread_and_unlock(
Chris@16 477 mutex::scoped_lock& lock)
Chris@16 478 {
Chris@16 479 if (first_idle_thread_)
Chris@16 480 {
Chris@16 481 thread_info* idle_thread = first_idle_thread_;
Chris@16 482 first_idle_thread_ = idle_thread->next;
Chris@16 483 idle_thread->next = 0;
Chris@16 484 idle_thread->wakeup_event->signal_and_unlock(lock);
Chris@16 485 return true;
Chris@16 486 }
Chris@16 487 return false;
Chris@16 488 }
Chris@16 489
Chris@16 490 void task_io_service::wake_one_thread_and_unlock(
Chris@16 491 mutex::scoped_lock& lock)
Chris@16 492 {
Chris@16 493 if (!wake_one_idle_thread_and_unlock(lock))
Chris@16 494 {
Chris@16 495 if (!task_interrupted_ && task_)
Chris@16 496 {
Chris@16 497 task_interrupted_ = true;
Chris@16 498 task_->interrupt();
Chris@16 499 }
Chris@16 500 lock.unlock();
Chris@16 501 }
Chris@16 502 }
Chris@16 503
Chris@16 504 } // namespace detail
Chris@16 505 } // namespace asio
Chris@16 506 } // namespace boost
Chris@16 507
Chris@16 508 #include <boost/asio/detail/pop_options.hpp>
Chris@16 509
Chris@16 510 #endif // !defined(BOOST_ASIO_HAS_IOCP)
Chris@16 511
Chris@16 512 #endif // BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP