annotate DEPENDENCIES/generic/include/boost/thread/executors/basic_thread_pool.hpp @ 125:34e428693f5d vext

Vext -> Repoint
author Chris Cannam
date Thu, 14 Jun 2018 11:15:39 +0100
parents f46d142149f5
children
rev   line source
Chris@102 1 // Copyright (C) 2013-2014 Vicente J. Botet Escriba
Chris@102 2 //
Chris@102 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
Chris@102 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Chris@102 5 //
Chris@102 6 // 2013/09 Vicente J. Botet Escriba
Chris@102 7 // Adapt to boost from CCIA C++11 implementation
Chris@102 8 // first implementation of a simple pool thread using a vector of threads and a sync_queue.
Chris@102 9
Chris@102 10 #ifndef BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
Chris@102 11 #define BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
Chris@102 12
Chris@102 13 #include <boost/thread/detail/config.hpp>
Chris@102 14 #include <boost/thread/detail/delete.hpp>
Chris@102 15 #include <boost/thread/detail/move.hpp>
Chris@102 16 #include <boost/thread/scoped_thread.hpp>
Chris@102 17 #include <boost/thread/concurrent_queues/sync_queue.hpp>
Chris@102 18 #include <boost/thread/executors/work.hpp>
Chris@102 19 #include <boost/thread/csbl/vector.hpp>
Chris@102 20
Chris@102 21 #include <boost/config/abi_prefix.hpp>
Chris@102 22
Chris@102 23 namespace boost
Chris@102 24 {
Chris@102 25 namespace executors
Chris@102 26 {
Chris@102 27 class basic_thread_pool
Chris@102 28 {
Chris@102 29 public:
Chris@102 30 /// type-erasure to store the works to do
Chris@102 31 typedef executors::work work;
Chris@102 32 private:
Chris@102 33 /// the kind of stored threads are scoped threads to ensure that the threads are joined.
Chris@102 34 /// A move aware vector type
Chris@102 35 typedef scoped_thread<> thread_t;
Chris@102 36 typedef csbl::vector<thread_t> thread_vector;
Chris@102 37
Chris@102 38 /// the thread safe work queue
Chris@102 39 concurrent::sync_queue<work > work_queue;
Chris@102 40 /// A move aware vector
Chris@102 41 thread_vector threads;
Chris@102 42
Chris@102 43 public:
Chris@102 44 /**
Chris@102 45 * Effects: try to execute one task.
Chris@102 46 * Returns: whether a task has been executed.
Chris@102 47 * Throws: whatever the current task constructor throws or the task() throws.
Chris@102 48 */
Chris@102 49 bool try_executing_one()
Chris@102 50 {
Chris@102 51 try
Chris@102 52 {
Chris@102 53 work task;
Chris@102 54 if (work_queue.try_pull(task) == queue_op_status::success)
Chris@102 55 {
Chris@102 56 task();
Chris@102 57 return true;
Chris@102 58 }
Chris@102 59 return false;
Chris@102 60 }
Chris@102 61 catch (...)
Chris@102 62 {
Chris@102 63 std::terminate();
Chris@102 64 return false;
Chris@102 65 }
Chris@102 66 }
Chris@102 67 /**
Chris@102 68 * Effects: schedule one task or yields
Chris@102 69 * Throws: whatever the current task constructor throws or the task() throws.
Chris@102 70 */
Chris@102 71 void schedule_one_or_yield()
Chris@102 72 {
Chris@102 73 if ( ! try_executing_one())
Chris@102 74 {
Chris@102 75 this_thread::yield();
Chris@102 76 }
Chris@102 77 }
Chris@102 78 private:
Chris@102 79
Chris@102 80 /**
Chris@102 81 * The main loop of the worker threads
Chris@102 82 */
Chris@102 83 void worker_thread()
Chris@102 84 {
Chris@102 85 try
Chris@102 86 {
Chris@102 87 for(;;)
Chris@102 88 {
Chris@102 89 work task;
Chris@102 90 queue_op_status st = work_queue.wait_pull(task);
Chris@102 91 if (st == queue_op_status::closed) return;
Chris@102 92 task();
Chris@102 93 }
Chris@102 94 }
Chris@102 95 catch (...)
Chris@102 96 {
Chris@102 97 std::terminate();
Chris@102 98 return;
Chris@102 99 }
Chris@102 100 }
Chris@102 101 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
Chris@102 102 template <class AtThreadEntry>
Chris@102 103 void worker_thread1(AtThreadEntry& at_thread_entry)
Chris@102 104 {
Chris@102 105 at_thread_entry(*this);
Chris@102 106 worker_thread();
Chris@102 107 }
Chris@102 108 #endif
Chris@102 109 void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
Chris@102 110 {
Chris@102 111 at_thread_entry(*this);
Chris@102 112 worker_thread();
Chris@102 113 }
Chris@102 114 template <class AtThreadEntry>
Chris@102 115 void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
Chris@102 116 {
Chris@102 117 at_thread_entry(*this);
Chris@102 118 worker_thread();
Chris@102 119 }
Chris@102 120 static void do_nothing_at_thread_entry(basic_thread_pool&) {}
Chris@102 121
Chris@102 122 public:
Chris@102 123 /// basic_thread_pool is not copyable.
Chris@102 124 BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
Chris@102 125
Chris@102 126 /**
Chris@102 127 * \b Effects: creates a thread pool that runs closures on \c thread_count threads.
Chris@102 128 *
Chris@102 129 * \b Throws: Whatever exception is thrown while initializing the needed resources.
Chris@102 130 */
Chris@102 131 basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
Chris@102 132 {
Chris@102 133 try
Chris@102 134 {
Chris@102 135 threads.reserve(thread_count);
Chris@102 136 for (unsigned i = 0; i < thread_count; ++i)
Chris@102 137 {
Chris@102 138 #if 1
Chris@102 139 thread th (&basic_thread_pool::worker_thread, this);
Chris@102 140 threads.push_back(thread_t(boost::move(th)));
Chris@102 141 #else
Chris@102 142 threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
Chris@102 143 #endif
Chris@102 144 }
Chris@102 145 }
Chris@102 146 catch (...)
Chris@102 147 {
Chris@102 148 close();
Chris@102 149 throw;
Chris@102 150 }
Chris@102 151 }
Chris@102 152 /**
Chris@102 153 * \b Effects: creates a thread pool that runs closures on \c thread_count threads
Chris@102 154 * and executes the at_thread_entry function at the entry of each created thread. .
Chris@102 155 *
Chris@102 156 * \b Throws: Whatever exception is thrown while initializing the needed resources.
Chris@102 157 */
Chris@102 158 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
Chris@102 159 template <class AtThreadEntry>
Chris@102 160 basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
Chris@102 161 {
Chris@102 162 try
Chris@102 163 {
Chris@102 164 threads.reserve(thread_count);
Chris@102 165 for (unsigned i = 0; i < thread_count; ++i)
Chris@102 166 {
Chris@102 167 thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
Chris@102 168 threads.push_back(thread_t(boost::move(th)));
Chris@102 169 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
Chris@102 170 }
Chris@102 171 }
Chris@102 172 catch (...)
Chris@102 173 {
Chris@102 174 close();
Chris@102 175 throw;
Chris@102 176 }
Chris@102 177 }
Chris@102 178 #endif
Chris@102 179 basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
Chris@102 180 {
Chris@102 181 try
Chris@102 182 {
Chris@102 183 threads.reserve(thread_count);
Chris@102 184 for (unsigned i = 0; i < thread_count; ++i)
Chris@102 185 {
Chris@102 186 thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
Chris@102 187 threads.push_back(thread_t(boost::move(th)));
Chris@102 188 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
Chris@102 189 }
Chris@102 190 }
Chris@102 191 catch (...)
Chris@102 192 {
Chris@102 193 close();
Chris@102 194 throw;
Chris@102 195 }
Chris@102 196 }
Chris@102 197 template <class AtThreadEntry>
Chris@102 198 basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
Chris@102 199 {
Chris@102 200 try
Chris@102 201 {
Chris@102 202 threads.reserve(thread_count);
Chris@102 203 for (unsigned i = 0; i < thread_count; ++i)
Chris@102 204 {
Chris@102 205 thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
Chris@102 206 threads.push_back(thread_t(boost::move(th)));
Chris@102 207 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
Chris@102 208 }
Chris@102 209 }
Chris@102 210 catch (...)
Chris@102 211 {
Chris@102 212 close();
Chris@102 213 throw;
Chris@102 214 }
Chris@102 215 }
Chris@102 216 /**
Chris@102 217 * \b Effects: Destroys the thread pool.
Chris@102 218 *
Chris@102 219 * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
Chris@102 220 */
Chris@102 221 ~basic_thread_pool()
Chris@102 222 {
Chris@102 223 // signal to all the worker threads that there will be no more submissions.
Chris@102 224 close();
Chris@102 225 // joins all the threads as the threads were scoped_threads
Chris@102 226 }
Chris@102 227
Chris@102 228 /**
Chris@102 229 * \b Effects: join all the threads.
Chris@102 230 */
Chris@102 231 void join()
Chris@102 232 {
Chris@102 233 for (unsigned i = 0; i < threads.size(); ++i)
Chris@102 234 {
Chris@102 235 threads[i].join();
Chris@102 236 }
Chris@102 237 }
Chris@102 238
Chris@102 239 /**
Chris@102 240 * \b Effects: close the \c basic_thread_pool for submissions.
Chris@102 241 * The worker threads will work until there is no more closures to run.
Chris@102 242 */
Chris@102 243 void close()
Chris@102 244 {
Chris@102 245 work_queue.close();
Chris@102 246 }
Chris@102 247
Chris@102 248 /**
Chris@102 249 * \b Returns: whether the pool is closed for submissions.
Chris@102 250 */
Chris@102 251 bool closed()
Chris@102 252 {
Chris@102 253 return work_queue.closed();
Chris@102 254 }
Chris@102 255
Chris@102 256 /**
Chris@102 257 * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
Chris@102 258 *
Chris@102 259 * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
Chris@102 260 * If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads.
Chris@102 261 *
Chris@102 262 * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
Chris@102 263 *
Chris@102 264 * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
Chris@102 265 * Whatever exception that can be throw while storing the closure.
Chris@102 266 */
Chris@102 267
Chris@102 268 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
Chris@102 269 template <typename Closure>
Chris@102 270 void submit(Closure & closure)
Chris@102 271 {
Chris@102 272 work_queue.push(work(closure));
Chris@102 273 }
Chris@102 274 #endif
Chris@102 275 void submit(void (*closure)())
Chris@102 276 {
Chris@102 277 work_queue.push(work(closure));
Chris@102 278 }
Chris@102 279
Chris@102 280 template <typename Closure>
Chris@102 281 void submit(BOOST_THREAD_RV_REF(Closure) closure)
Chris@102 282 {
Chris@102 283 work_queue.push(work(boost::forward<Closure>(closure)));
Chris@102 284 }
Chris@102 285
Chris@102 286 /**
Chris@102 287 * \b Requires: This must be called from an scheduled task.
Chris@102 288 *
Chris@102 289 * \b Effects: reschedule functions until pred()
Chris@102 290 */
Chris@102 291 template <typename Pred>
Chris@102 292 bool reschedule_until(Pred const& pred)
Chris@102 293 {
Chris@102 294 do {
Chris@102 295 if ( ! try_executing_one())
Chris@102 296 {
Chris@102 297 return false;
Chris@102 298 }
Chris@102 299 } while (! pred());
Chris@102 300 return true;
Chris@102 301 }
Chris@102 302
Chris@102 303 };
Chris@102 304 }
Chris@102 305 using executors::basic_thread_pool;
Chris@102 306
Chris@102 307 }
Chris@102 308
Chris@102 309 #include <boost/config/abi_suffix.hpp>
Chris@102 310
Chris@102 311 #endif