Chris@102
|
1 // Copyright (C) 2013-2014 Vicente J. Botet Escriba
|
Chris@102
|
2 //
|
Chris@102
|
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
|
Chris@102
|
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
Chris@102
|
5 //
|
Chris@102
|
6 // 2013/09 Vicente J. Botet Escriba
|
Chris@102
|
7 // Adapt to boost from CCIA C++11 implementation
|
Chris@102
|
8 // first implementation of a simple pool thread using a vector of threads and a sync_queue.
|
Chris@102
|
9
|
Chris@102
|
10 #ifndef BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
|
Chris@102
|
11 #define BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
|
Chris@102
|
12
|
Chris@102
|
13 #include <boost/thread/detail/config.hpp>
|
Chris@102
|
14 #include <boost/thread/detail/delete.hpp>
|
Chris@102
|
15 #include <boost/thread/detail/move.hpp>
|
Chris@102
|
16 #include <boost/thread/scoped_thread.hpp>
|
Chris@102
|
17 #include <boost/thread/concurrent_queues/sync_queue.hpp>
|
Chris@102
|
18 #include <boost/thread/executors/work.hpp>
|
Chris@102
|
19 #include <boost/thread/csbl/vector.hpp>
|
Chris@102
|
20
|
Chris@102
|
21 #include <boost/config/abi_prefix.hpp>
|
Chris@102
|
22
|
Chris@102
|
23 namespace boost
|
Chris@102
|
24 {
|
Chris@102
|
25 namespace executors
|
Chris@102
|
26 {
|
Chris@102
|
27 class basic_thread_pool
|
Chris@102
|
28 {
|
Chris@102
|
29 public:
|
Chris@102
|
30 /// type-erasure to store the works to do
|
Chris@102
|
31 typedef executors::work work;
|
Chris@102
|
32 private:
|
Chris@102
|
33 /// the kind of stored threads are scoped threads to ensure that the threads are joined.
|
Chris@102
|
34 /// A move aware vector type
|
Chris@102
|
35 typedef scoped_thread<> thread_t;
|
Chris@102
|
36 typedef csbl::vector<thread_t> thread_vector;
|
Chris@102
|
37
|
Chris@102
|
38 /// the thread safe work queue
|
Chris@102
|
39 concurrent::sync_queue<work > work_queue;
|
Chris@102
|
40 /// A move aware vector
|
Chris@102
|
41 thread_vector threads;
|
Chris@102
|
42
|
Chris@102
|
43 public:
|
Chris@102
|
44 /**
|
Chris@102
|
45 * Effects: try to execute one task.
|
Chris@102
|
46 * Returns: whether a task has been executed.
|
Chris@102
|
47 * Throws: whatever the current task constructor throws or the task() throws.
|
Chris@102
|
48 */
|
Chris@102
|
49 bool try_executing_one()
|
Chris@102
|
50 {
|
Chris@102
|
51 try
|
Chris@102
|
52 {
|
Chris@102
|
53 work task;
|
Chris@102
|
54 if (work_queue.try_pull(task) == queue_op_status::success)
|
Chris@102
|
55 {
|
Chris@102
|
56 task();
|
Chris@102
|
57 return true;
|
Chris@102
|
58 }
|
Chris@102
|
59 return false;
|
Chris@102
|
60 }
|
Chris@102
|
61 catch (...)
|
Chris@102
|
62 {
|
Chris@102
|
63 std::terminate();
|
Chris@102
|
64 return false;
|
Chris@102
|
65 }
|
Chris@102
|
66 }
|
Chris@102
|
67 /**
|
Chris@102
|
68 * Effects: schedule one task or yields
|
Chris@102
|
69 * Throws: whatever the current task constructor throws or the task() throws.
|
Chris@102
|
70 */
|
Chris@102
|
71 void schedule_one_or_yield()
|
Chris@102
|
72 {
|
Chris@102
|
73 if ( ! try_executing_one())
|
Chris@102
|
74 {
|
Chris@102
|
75 this_thread::yield();
|
Chris@102
|
76 }
|
Chris@102
|
77 }
|
Chris@102
|
78 private:
|
Chris@102
|
79
|
Chris@102
|
80 /**
|
Chris@102
|
81 * The main loop of the worker threads
|
Chris@102
|
82 */
|
Chris@102
|
83 void worker_thread()
|
Chris@102
|
84 {
|
Chris@102
|
85 try
|
Chris@102
|
86 {
|
Chris@102
|
87 for(;;)
|
Chris@102
|
88 {
|
Chris@102
|
89 work task;
|
Chris@102
|
90 queue_op_status st = work_queue.wait_pull(task);
|
Chris@102
|
91 if (st == queue_op_status::closed) return;
|
Chris@102
|
92 task();
|
Chris@102
|
93 }
|
Chris@102
|
94 }
|
Chris@102
|
95 catch (...)
|
Chris@102
|
96 {
|
Chris@102
|
97 std::terminate();
|
Chris@102
|
98 return;
|
Chris@102
|
99 }
|
Chris@102
|
100 }
|
Chris@102
|
101 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
|
Chris@102
|
102 template <class AtThreadEntry>
|
Chris@102
|
103 void worker_thread1(AtThreadEntry& at_thread_entry)
|
Chris@102
|
104 {
|
Chris@102
|
105 at_thread_entry(*this);
|
Chris@102
|
106 worker_thread();
|
Chris@102
|
107 }
|
Chris@102
|
108 #endif
|
Chris@102
|
109 void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
|
Chris@102
|
110 {
|
Chris@102
|
111 at_thread_entry(*this);
|
Chris@102
|
112 worker_thread();
|
Chris@102
|
113 }
|
Chris@102
|
114 template <class AtThreadEntry>
|
Chris@102
|
115 void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
|
Chris@102
|
116 {
|
Chris@102
|
117 at_thread_entry(*this);
|
Chris@102
|
118 worker_thread();
|
Chris@102
|
119 }
|
Chris@102
|
120 static void do_nothing_at_thread_entry(basic_thread_pool&) {}
|
Chris@102
|
121
|
Chris@102
|
122 public:
|
Chris@102
|
123 /// basic_thread_pool is not copyable.
|
Chris@102
|
124 BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
|
Chris@102
|
125
|
Chris@102
|
126 /**
|
Chris@102
|
127 * \b Effects: creates a thread pool that runs closures on \c thread_count threads.
|
Chris@102
|
128 *
|
Chris@102
|
129 * \b Throws: Whatever exception is thrown while initializing the needed resources.
|
Chris@102
|
130 */
|
Chris@102
|
131 basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
|
Chris@102
|
132 {
|
Chris@102
|
133 try
|
Chris@102
|
134 {
|
Chris@102
|
135 threads.reserve(thread_count);
|
Chris@102
|
136 for (unsigned i = 0; i < thread_count; ++i)
|
Chris@102
|
137 {
|
Chris@102
|
138 #if 1
|
Chris@102
|
139 thread th (&basic_thread_pool::worker_thread, this);
|
Chris@102
|
140 threads.push_back(thread_t(boost::move(th)));
|
Chris@102
|
141 #else
|
Chris@102
|
142 threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
|
Chris@102
|
143 #endif
|
Chris@102
|
144 }
|
Chris@102
|
145 }
|
Chris@102
|
146 catch (...)
|
Chris@102
|
147 {
|
Chris@102
|
148 close();
|
Chris@102
|
149 throw;
|
Chris@102
|
150 }
|
Chris@102
|
151 }
|
Chris@102
|
152 /**
|
Chris@102
|
153 * \b Effects: creates a thread pool that runs closures on \c thread_count threads
|
Chris@102
|
154 * and executes the at_thread_entry function at the entry of each created thread. .
|
Chris@102
|
155 *
|
Chris@102
|
156 * \b Throws: Whatever exception is thrown while initializing the needed resources.
|
Chris@102
|
157 */
|
Chris@102
|
158 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
|
Chris@102
|
159 template <class AtThreadEntry>
|
Chris@102
|
160 basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
|
Chris@102
|
161 {
|
Chris@102
|
162 try
|
Chris@102
|
163 {
|
Chris@102
|
164 threads.reserve(thread_count);
|
Chris@102
|
165 for (unsigned i = 0; i < thread_count; ++i)
|
Chris@102
|
166 {
|
Chris@102
|
167 thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
|
Chris@102
|
168 threads.push_back(thread_t(boost::move(th)));
|
Chris@102
|
169 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
|
Chris@102
|
170 }
|
Chris@102
|
171 }
|
Chris@102
|
172 catch (...)
|
Chris@102
|
173 {
|
Chris@102
|
174 close();
|
Chris@102
|
175 throw;
|
Chris@102
|
176 }
|
Chris@102
|
177 }
|
Chris@102
|
178 #endif
|
Chris@102
|
179 basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
|
Chris@102
|
180 {
|
Chris@102
|
181 try
|
Chris@102
|
182 {
|
Chris@102
|
183 threads.reserve(thread_count);
|
Chris@102
|
184 for (unsigned i = 0; i < thread_count; ++i)
|
Chris@102
|
185 {
|
Chris@102
|
186 thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
|
Chris@102
|
187 threads.push_back(thread_t(boost::move(th)));
|
Chris@102
|
188 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
|
Chris@102
|
189 }
|
Chris@102
|
190 }
|
Chris@102
|
191 catch (...)
|
Chris@102
|
192 {
|
Chris@102
|
193 close();
|
Chris@102
|
194 throw;
|
Chris@102
|
195 }
|
Chris@102
|
196 }
|
Chris@102
|
197 template <class AtThreadEntry>
|
Chris@102
|
198 basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
|
Chris@102
|
199 {
|
Chris@102
|
200 try
|
Chris@102
|
201 {
|
Chris@102
|
202 threads.reserve(thread_count);
|
Chris@102
|
203 for (unsigned i = 0; i < thread_count; ++i)
|
Chris@102
|
204 {
|
Chris@102
|
205 thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
|
Chris@102
|
206 threads.push_back(thread_t(boost::move(th)));
|
Chris@102
|
207 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
|
Chris@102
|
208 }
|
Chris@102
|
209 }
|
Chris@102
|
210 catch (...)
|
Chris@102
|
211 {
|
Chris@102
|
212 close();
|
Chris@102
|
213 throw;
|
Chris@102
|
214 }
|
Chris@102
|
215 }
|
Chris@102
|
216 /**
|
Chris@102
|
217 * \b Effects: Destroys the thread pool.
|
Chris@102
|
218 *
|
Chris@102
|
219 * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
|
Chris@102
|
220 */
|
Chris@102
|
221 ~basic_thread_pool()
|
Chris@102
|
222 {
|
Chris@102
|
223 // signal to all the worker threads that there will be no more submissions.
|
Chris@102
|
224 close();
|
Chris@102
|
225 // joins all the threads as the threads were scoped_threads
|
Chris@102
|
226 }
|
Chris@102
|
227
|
Chris@102
|
228 /**
|
Chris@102
|
229 * \b Effects: join all the threads.
|
Chris@102
|
230 */
|
Chris@102
|
231 void join()
|
Chris@102
|
232 {
|
Chris@102
|
233 for (unsigned i = 0; i < threads.size(); ++i)
|
Chris@102
|
234 {
|
Chris@102
|
235 threads[i].join();
|
Chris@102
|
236 }
|
Chris@102
|
237 }
|
Chris@102
|
238
|
Chris@102
|
239 /**
|
Chris@102
|
240 * \b Effects: close the \c basic_thread_pool for submissions.
|
Chris@102
|
241 * The worker threads will work until there is no more closures to run.
|
Chris@102
|
242 */
|
Chris@102
|
243 void close()
|
Chris@102
|
244 {
|
Chris@102
|
245 work_queue.close();
|
Chris@102
|
246 }
|
Chris@102
|
247
|
Chris@102
|
248 /**
|
Chris@102
|
249 * \b Returns: whether the pool is closed for submissions.
|
Chris@102
|
250 */
|
Chris@102
|
251 bool closed()
|
Chris@102
|
252 {
|
Chris@102
|
253 return work_queue.closed();
|
Chris@102
|
254 }
|
Chris@102
|
255
|
Chris@102
|
256 /**
|
Chris@102
|
257 * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
|
Chris@102
|
258 *
|
Chris@102
|
259 * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
|
Chris@102
|
260 * If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads.
|
Chris@102
|
261 *
|
Chris@102
|
262 * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
|
Chris@102
|
263 *
|
Chris@102
|
264 * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
|
Chris@102
|
265 * Whatever exception that can be throw while storing the closure.
|
Chris@102
|
266 */
|
Chris@102
|
267
|
Chris@102
|
268 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
|
Chris@102
|
269 template <typename Closure>
|
Chris@102
|
270 void submit(Closure & closure)
|
Chris@102
|
271 {
|
Chris@102
|
272 work_queue.push(work(closure));
|
Chris@102
|
273 }
|
Chris@102
|
274 #endif
|
Chris@102
|
275 void submit(void (*closure)())
|
Chris@102
|
276 {
|
Chris@102
|
277 work_queue.push(work(closure));
|
Chris@102
|
278 }
|
Chris@102
|
279
|
Chris@102
|
280 template <typename Closure>
|
Chris@102
|
281 void submit(BOOST_THREAD_RV_REF(Closure) closure)
|
Chris@102
|
282 {
|
Chris@102
|
283 work_queue.push(work(boost::forward<Closure>(closure)));
|
Chris@102
|
284 }
|
Chris@102
|
285
|
Chris@102
|
286 /**
|
Chris@102
|
287 * \b Requires: This must be called from an scheduled task.
|
Chris@102
|
288 *
|
Chris@102
|
289 * \b Effects: reschedule functions until pred()
|
Chris@102
|
290 */
|
Chris@102
|
291 template <typename Pred>
|
Chris@102
|
292 bool reschedule_until(Pred const& pred)
|
Chris@102
|
293 {
|
Chris@102
|
294 do {
|
Chris@102
|
295 if ( ! try_executing_one())
|
Chris@102
|
296 {
|
Chris@102
|
297 return false;
|
Chris@102
|
298 }
|
Chris@102
|
299 } while (! pred());
|
Chris@102
|
300 return true;
|
Chris@102
|
301 }
|
Chris@102
|
302
|
Chris@102
|
303 };
|
Chris@102
|
304 }
|
Chris@102
|
305 using executors::basic_thread_pool;
|
Chris@102
|
306
|
Chris@102
|
307 }
|
Chris@102
|
308
|
Chris@102
|
309 #include <boost/config/abi_suffix.hpp>
|
Chris@102
|
310
|
Chris@102
|
311 #endif
|