comparison DEPENDENCIES/generic/include/boost/thread/sync_queue.hpp @ 16:2665513ce2d3

Add boost headers
author Chris Cannam
date Tue, 05 Aug 2014 11:11:38 +0100
parents
children c530137014c0
comparison
equal deleted inserted replaced
15:663ca0da4350 16:2665513ce2d3
1 #ifndef BOOST_THREAD_SYNC_QUEUE_HPP
2 #define BOOST_THREAD_SYNC_QUEUE_HPP
3
4 //////////////////////////////////////////////////////////////////////////////
5 //
6 // (C) Copyright Vicente J. Botet Escriba 2013. Distributed under the Boost
7 // Software License, Version 1.0. (See accompanying file
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 // See http://www.boost.org/libs/thread for documentation.
11 //
12 //////////////////////////////////////////////////////////////////////////////
13
14 #include <boost/thread/detail/config.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/mutex.hpp>
17 #include <boost/thread/detail/move.hpp>
18 #include <boost/throw_exception.hpp>
19 #include <boost/smart_ptr/shared_ptr.hpp>
20 #include <boost/smart_ptr/make_shared.hpp>
21
22 #include <boost/thread/sync_bounded_queue.hpp>
23 #include <boost/container/deque.hpp>
24
25 #include <boost/config/abi_prefix.hpp>
26
27 namespace boost
28 {
29
30
31 template <typename ValueType>
32 class sync_queue
33 {
34 public:
35 typedef ValueType value_type;
36 typedef std::size_t size_type;
37
38 // Constructors/Assignment/Destructors
39 BOOST_THREAD_NO_COPYABLE(sync_queue)
40 inline sync_queue();
41 //template <typename Range>
42 //inline explicit sync_queue(Range range);
43 inline ~sync_queue();
44
45 // Observers
46 inline bool empty() const;
47 inline bool full() const;
48 inline size_type size() const;
49 inline bool closed() const;
50
51 // Modifiers
52 inline void close();
53
54 inline void push(const value_type& x);
55 inline void push(BOOST_THREAD_RV_REF(value_type) x);
56 inline bool try_push(const value_type& x);
57 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x);
58 inline bool try_push(no_block_tag, const value_type& x);
59 inline bool try_push(no_block_tag, BOOST_THREAD_RV_REF(value_type) x);
60
61 // Observers/Modifiers
62 inline void pull(value_type&);
63 inline void pull(ValueType& elem, bool & closed);
64 // enable_if is_nothrow_copy_movable<value_type>
65 inline value_type pull();
66 inline shared_ptr<ValueType> ptr_pull();
67 inline bool try_pull(value_type&);
68 inline bool try_pull(no_block_tag,value_type&);
69 inline shared_ptr<ValueType> try_pull();
70
71 private:
72 mutable mutex mtx_;
73 condition_variable not_empty_;
74 size_type waiting_empty_;
75 boost::container::deque<ValueType> data_;
76 bool closed_;
77
78 inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
79 {
80 return data_.empty();
81 }
82 inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
83 {
84 return data_.empty();
85 }
86
87 inline size_type size(lock_guard<mutex>& ) const BOOST_NOEXCEPT
88 {
89 return data_.size();
90 }
91
92 inline void throw_if_closed(unique_lock<mutex>&);
93
94 inline bool try_pull(value_type& x, unique_lock<mutex>& lk);
95 inline bool try_push(const value_type& x, unique_lock<mutex>& lk);
96 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
97 inline shared_ptr<value_type> try_pull(unique_lock<mutex>& lk);
98
99 inline void wait_until_not_empty(unique_lock<mutex>& lk);
100 inline void wait_until_not_empty(unique_lock<mutex>& lk, bool&);
101
102 inline void notify_not_empty_if_needed(unique_lock<mutex>& lk)
103 {
104 if (waiting_empty_ > 0)
105 {
106 --waiting_empty_;
107 lk.unlock();
108 not_empty_.notify_one();
109 }
110 }
111
112 inline void pull(value_type& elem, unique_lock<mutex>& )
113 {
114 elem = boost::move(data_.front());
115 data_.pop_front();
116 }
117 inline boost::shared_ptr<value_type> ptr_pull(unique_lock<mutex>& )
118 {
119 shared_ptr<value_type> res = make_shared<value_type>(boost::move(data_.front()));
120 data_.pop_front();
121 return res;
122 }
123
124 inline void push(const value_type& elem, unique_lock<mutex>& lk)
125 {
126 data_.push_back(elem);
127 notify_not_empty_if_needed(lk);
128 }
129
130 inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk)
131 {
132 data_.push(boost::move(elem));
133 notify_not_empty_if_needed(lk);
134 }
135 };
136
137 template <typename ValueType>
138 sync_queue<ValueType>::sync_queue() :
139 waiting_empty_(0), data_(), closed_(false)
140 {
141 BOOST_ASSERT(data_.empty());
142 }
143
144 // template <typename ValueType>
145 // template <typename Range>
146 // explicit sync_queue<ValueType>::sync_queue(Range range) :
147 // waiting_empty_(0), data_(), closed_(false)
148 // {
149 // try
150 // {
151 // typedef typename Range::iterator iterator_t;
152 // iterator_t first = boost::begin(range);
153 // iterator_t end = boost::end(range);
154 // for (iterator_t cur = first; cur != end; ++cur)
155 // {
156 // data_.push(boost::move(*cur));;
157 // }
158 // notify_not_empty_if_needed(lk);
159 // }
160 // catch (...)
161 // {
162 // delete[] data_;
163 // }
164 // }
165
166 template <typename ValueType>
167 sync_queue<ValueType>::~sync_queue()
168 {
169 }
170
171 template <typename ValueType>
172 void sync_queue<ValueType>::close()
173 {
174 {
175 lock_guard<mutex> lk(mtx_);
176 closed_ = true;
177 }
178 not_empty_.notify_all();
179 }
180
181 template <typename ValueType>
182 bool sync_queue<ValueType>::closed() const
183 {
184 lock_guard<mutex> lk(mtx_);
185 return closed_;
186 }
187
188 template <typename ValueType>
189 bool sync_queue<ValueType>::empty() const
190 {
191 lock_guard<mutex> lk(mtx_);
192 return empty(lk);
193 }
194 template <typename ValueType>
195 bool sync_queue<ValueType>::full() const
196 {
197 return false;
198 }
199
200 template <typename ValueType>
201 typename sync_queue<ValueType>::size_type sync_queue<ValueType>::size() const
202 {
203 lock_guard<mutex> lk(mtx_);
204 return size(lk);
205 }
206
207
208 template <typename ValueType>
209 bool sync_queue<ValueType>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
210 {
211 if (empty(lk))
212 {
213 throw_if_closed(lk);
214 return false;
215 }
216 pull(elem, lk);
217 return true;
218 }
219 template <typename ValueType>
220 shared_ptr<ValueType> sync_queue<ValueType>::try_pull(unique_lock<mutex>& lk)
221 {
222 if (empty(lk))
223 {
224 throw_if_closed(lk);
225 return shared_ptr<ValueType>();
226 }
227 return ptr_pull(lk);
228 }
229
230 template <typename ValueType>
231 bool sync_queue<ValueType>::try_pull(ValueType& elem)
232 {
233 try
234 {
235 unique_lock<mutex> lk(mtx_);
236 return try_pull(elem, lk);
237 }
238 catch (...)
239 {
240 close();
241 throw;
242 }
243 }
244
245 template <typename ValueType>
246 bool sync_queue<ValueType>::try_pull(no_block_tag,ValueType& elem)
247 {
248 try
249 {
250 unique_lock<mutex> lk(mtx_, try_to_lock);
251 if (!lk.owns_lock())
252 {
253 return false;
254 }
255 return try_pull(elem, lk);
256 }
257 catch (...)
258 {
259 close();
260 throw;
261 }
262 }
263 template <typename ValueType>
264 boost::shared_ptr<ValueType> sync_queue<ValueType>::try_pull()
265 {
266 try
267 {
268 unique_lock<mutex> lk(mtx_);
269 return try_pull(lk);
270 }
271 catch (...)
272 {
273 close();
274 throw;
275 }
276 }
277
278 template <typename ValueType>
279 void sync_queue<ValueType>::throw_if_closed(unique_lock<mutex>&)
280 {
281 if (closed_)
282 {
283 BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
284 }
285 }
286
287 template <typename ValueType>
288 void sync_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk)
289 {
290 for (;;)
291 {
292 if (! empty(lk)) break;
293 throw_if_closed(lk);
294 ++waiting_empty_;
295 not_empty_.wait(lk);
296 }
297 }
298 template <typename ValueType>
299 void sync_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk, bool & closed)
300 {
301 for (;;)
302 {
303 if (! empty(lk)) break;
304 if (closed_) {closed=true; return;}
305 ++waiting_empty_;
306 not_empty_.wait(lk);
307 }
308 closed=false;
309 }
310
311 template <typename ValueType>
312 void sync_queue<ValueType>::pull(ValueType& elem)
313 {
314 try
315 {
316 unique_lock<mutex> lk(mtx_);
317 wait_until_not_empty(lk);
318 pull(elem, lk);
319 }
320 catch (...)
321 {
322 close();
323 throw;
324 }
325 }
326 template <typename ValueType>
327 void sync_queue<ValueType>::pull(ValueType& elem, bool & closed)
328 {
329 try
330 {
331 unique_lock<mutex> lk(mtx_);
332 wait_until_not_empty(lk, closed);
333 if (closed) {return;}
334 pull(elem, lk);
335 }
336 catch (...)
337 {
338 close();
339 throw;
340 }
341 }
342
343 // enable if ValueType is nothrow movable
344 template <typename ValueType>
345 ValueType sync_queue<ValueType>::pull()
346 {
347 try
348 {
349 value_type elem;
350 pull(elem);
351 return boost::move(elem);
352 }
353 catch (...)
354 {
355 close();
356 throw;
357 }
358 }
359 template <typename ValueType>
360 boost::shared_ptr<ValueType> sync_queue<ValueType>::ptr_pull()
361 {
362 try
363 {
364 unique_lock<mutex> lk(mtx_);
365 wait_until_not_empty(lk);
366 return ptr_pull(lk);
367 }
368 catch (...)
369 {
370 close();
371 throw;
372 }
373 }
374
375 template <typename ValueType>
376 bool sync_queue<ValueType>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
377 {
378 throw_if_closed(lk);
379 push(elem, lk);
380 return true;
381 }
382
383 template <typename ValueType>
384 bool sync_queue<ValueType>::try_push(const ValueType& elem)
385 {
386 try
387 {
388 unique_lock<mutex> lk(mtx_);
389 return try_push(elem, lk);
390 }
391 catch (...)
392 {
393 close();
394 throw;
395 }
396 }
397
398 template <typename ValueType>
399 bool sync_queue<ValueType>::try_push(no_block_tag, const ValueType& elem)
400 {
401 try
402 {
403 unique_lock<mutex> lk(mtx_, try_to_lock);
404 if (!lk.owns_lock()) return false;
405 return try_push(elem, lk);
406 }
407 catch (...)
408 {
409 close();
410 throw;
411 }
412 }
413
414 template <typename ValueType>
415 void sync_queue<ValueType>::push(const ValueType& elem)
416 {
417 try
418 {
419 unique_lock<mutex> lk(mtx_);
420 throw_if_closed(lk);
421 push(elem, lk);
422 }
423 catch (...)
424 {
425 close();
426 throw;
427 }
428 }
429
430 template <typename ValueType>
431 bool sync_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
432 {
433 throw_if_closed(lk);
434 push(boost::forward<ValueType>(elem), lk);
435 return true;
436 }
437
438 template <typename ValueType>
439 bool sync_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
440 {
441 try
442 {
443 unique_lock<mutex> lk(mtx_);
444 return try_push(elem, lk);
445 }
446 catch (...)
447 {
448 close();
449 throw;
450 }
451 }
452
453 template <typename ValueType>
454 bool sync_queue<ValueType>::try_push(no_block_tag, BOOST_THREAD_RV_REF(ValueType) elem)
455 {
456 try
457 {
458 unique_lock<mutex> lk(mtx_, try_to_lock);
459 if (!lk.owns_lock())
460 {
461 return false;
462 }
463 return try_push(elem, lk);
464 }
465 catch (...)
466 {
467 close();
468 throw;
469 }
470 }
471
472 template <typename ValueType>
473 void sync_queue<ValueType>::push(BOOST_THREAD_RV_REF(ValueType) elem)
474 {
475 try
476 {
477 unique_lock<mutex> lk(mtx_);
478 throw_if_closed(lk);
479 push(elem, lk);
480 }
481 catch (...)
482 {
483 close();
484 throw;
485 }
486 }
487
488 template <typename ValueType>
489 sync_queue<ValueType>& operator<<(sync_queue<ValueType>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
490 {
491 sbq.push(boost::forward<ValueType>(elem));
492 return sbq;
493 }
494
495 template <typename ValueType>
496 sync_queue<ValueType>& operator<<(sync_queue<ValueType>& sbq, ValueType const&elem)
497 {
498 sbq.push(elem);
499 return sbq;
500 }
501
502 template <typename ValueType>
503 sync_queue<ValueType>& operator>>(sync_queue<ValueType>& sbq, ValueType &elem)
504 {
505 sbq.pull(elem);
506 return sbq;
507 }
508
509 }
510
511 #include <boost/config/abi_suffix.hpp>
512
513 #endif