Chris@102
|
1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP
|
Chris@102
|
2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP
|
Chris@102
|
3
|
Chris@102
|
4 //////////////////////////////////////////////////////////////////////////////
|
Chris@102
|
5 //
|
Chris@102
|
6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
|
Chris@102
|
7 // Software License, Version 1.0. (See accompanying file
|
Chris@102
|
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
Chris@102
|
9 //
|
Chris@102
|
10 // See http://www.boost.org/libs/thread for documentation.
|
Chris@102
|
11 //
|
Chris@102
|
12 //////////////////////////////////////////////////////////////////////////////
|
Chris@102
|
13
|
Chris@102
|
14 #include <boost/thread/detail/config.hpp>
|
Chris@102
|
15 #include <boost/thread/condition_variable.hpp>
|
Chris@102
|
16 #include <boost/thread/mutex.hpp>
|
Chris@102
|
17 #include <boost/thread/detail/move.hpp>
|
Chris@102
|
18 #include <boost/throw_exception.hpp>
|
Chris@102
|
19 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
|
Chris@102
|
20
|
Chris@102
|
21 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
22 #include <boost/smart_ptr/shared_ptr.hpp>
|
Chris@102
|
23 #include <boost/smart_ptr/make_shared.hpp>
|
Chris@102
|
24 #endif
|
Chris@102
|
25 #include <boost/config/abi_prefix.hpp>
|
Chris@102
|
26
|
Chris@102
|
27 namespace boost
|
Chris@102
|
28 {
|
Chris@102
|
29 namespace concurrent
|
Chris@102
|
30 {
|
Chris@102
|
31 template <typename ValueType>
|
Chris@102
|
32 class sync_bounded_queue
|
Chris@102
|
33 {
|
Chris@102
|
34 public:
|
Chris@102
|
35 typedef ValueType value_type;
|
Chris@102
|
36 typedef std::size_t size_type;
|
Chris@102
|
37
|
Chris@102
|
38 // Constructors/Assignment/Destructors
|
Chris@102
|
39 BOOST_THREAD_NO_COPYABLE(sync_bounded_queue)
|
Chris@102
|
40 explicit sync_bounded_queue(size_type max_elems);
|
Chris@102
|
41 template <typename Range>
|
Chris@102
|
42 sync_bounded_queue(size_type max_elems, Range range);
|
Chris@102
|
43 ~sync_bounded_queue();
|
Chris@102
|
44
|
Chris@102
|
45 // Observers
|
Chris@102
|
46 inline bool empty() const;
|
Chris@102
|
47 inline bool full() const;
|
Chris@102
|
48 inline size_type capacity() const;
|
Chris@102
|
49 inline size_type size() const;
|
Chris@102
|
50 inline bool closed() const;
|
Chris@102
|
51
|
Chris@102
|
52 // Modifiers
|
Chris@102
|
53 inline void close();
|
Chris@102
|
54
|
Chris@102
|
55 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
56 inline void push(const value_type& x);
|
Chris@102
|
57 inline void push(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
58 inline bool try_push(const value_type& x);
|
Chris@102
|
59 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
60 inline bool try_push(no_block_tag, const value_type& x);
|
Chris@102
|
61 inline bool try_push(no_block_tag, BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
62 #endif
|
Chris@102
|
63 inline void push_back(const value_type& x);
|
Chris@102
|
64 inline void push_back(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
65 inline queue_op_status try_push_back(const value_type& x);
|
Chris@102
|
66 inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
67 inline queue_op_status nonblocking_push_back(const value_type& x);
|
Chris@102
|
68 inline queue_op_status nonblocking_push_back(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
69 inline queue_op_status wait_push_back(const value_type& x);
|
Chris@102
|
70 inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
71
|
Chris@102
|
72 // Observers/Modifiers
|
Chris@102
|
73 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
74 inline void pull(value_type&);
|
Chris@102
|
75 // enable_if is_nothrow_copy_movable<value_type>
|
Chris@102
|
76 inline value_type pull();
|
Chris@102
|
77 inline shared_ptr<ValueType> ptr_pull();
|
Chris@102
|
78 inline bool try_pull(value_type&);
|
Chris@102
|
79 inline bool try_pull(no_block_tag,value_type&);
|
Chris@102
|
80 inline shared_ptr<ValueType> try_pull();
|
Chris@102
|
81 #endif
|
Chris@102
|
82 inline void pull_front(value_type&);
|
Chris@102
|
83 // enable_if is_nothrow_copy_movable<value_type>
|
Chris@102
|
84 inline value_type pull_front();
|
Chris@102
|
85 inline queue_op_status try_pull_front(value_type&);
|
Chris@102
|
86 inline queue_op_status nonblocking_pull_front(value_type&);
|
Chris@102
|
87
|
Chris@102
|
88 inline queue_op_status wait_pull_front(ValueType& elem);
|
Chris@102
|
89
|
Chris@102
|
90 private:
|
Chris@102
|
91 mutable mutex mtx_;
|
Chris@102
|
92 condition_variable not_empty_;
|
Chris@102
|
93 condition_variable not_full_;
|
Chris@102
|
94 size_type waiting_full_;
|
Chris@102
|
95 size_type waiting_empty_;
|
Chris@102
|
96 value_type* data_;
|
Chris@102
|
97 size_type in_;
|
Chris@102
|
98 size_type out_;
|
Chris@102
|
99 size_type capacity_;
|
Chris@102
|
100 bool closed_;
|
Chris@102
|
101
|
Chris@102
|
102 inline size_type inc(size_type idx) const BOOST_NOEXCEPT
|
Chris@102
|
103 {
|
Chris@102
|
104 return (idx + 1) % capacity_;
|
Chris@102
|
105 }
|
Chris@102
|
106
|
Chris@102
|
107 inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
|
Chris@102
|
108 {
|
Chris@102
|
109 return in_ == out_;
|
Chris@102
|
110 }
|
Chris@102
|
111 inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
|
Chris@102
|
112 {
|
Chris@102
|
113 return in_ == out_;
|
Chris@102
|
114 }
|
Chris@102
|
115 inline bool full(unique_lock<mutex>& ) const BOOST_NOEXCEPT
|
Chris@102
|
116 {
|
Chris@102
|
117 return (inc(in_) == out_);
|
Chris@102
|
118 }
|
Chris@102
|
119 inline bool full(lock_guard<mutex>& ) const BOOST_NOEXCEPT
|
Chris@102
|
120 {
|
Chris@102
|
121 return (inc(in_) == out_);
|
Chris@102
|
122 }
|
Chris@102
|
123 inline size_type capacity(lock_guard<mutex>& ) const BOOST_NOEXCEPT
|
Chris@102
|
124 {
|
Chris@102
|
125 return capacity_-1;
|
Chris@102
|
126 }
|
Chris@102
|
127 inline size_type size(lock_guard<mutex>& lk) const BOOST_NOEXCEPT
|
Chris@102
|
128 {
|
Chris@102
|
129 if (full(lk)) return capacity(lk);
|
Chris@102
|
130 return ((out_+capacity(lk)-in_) % capacity(lk));
|
Chris@102
|
131 }
|
Chris@102
|
132
|
Chris@102
|
133 inline void throw_if_closed(unique_lock<mutex>&);
|
Chris@102
|
134 inline bool closed(unique_lock<mutex>&) const;
|
Chris@102
|
135
|
Chris@102
|
136 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
137 inline bool try_pull(value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
138 inline shared_ptr<value_type> try_pull(unique_lock<mutex>& lk);
|
Chris@102
|
139 inline bool try_push(const value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
140 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
|
Chris@102
|
141 #endif
|
Chris@102
|
142 inline queue_op_status try_pull_front(value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
143 inline queue_op_status try_push_back(const value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
144 inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
|
Chris@102
|
145
|
Chris@102
|
146 inline queue_op_status wait_pull_front(value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
147 inline queue_op_status wait_push_back(const value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
148 inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
|
Chris@102
|
149
|
Chris@102
|
150 inline void wait_until_not_empty(unique_lock<mutex>& lk);
|
Chris@102
|
151 inline void wait_until_not_empty(unique_lock<mutex>& lk, bool&);
|
Chris@102
|
152 inline size_type wait_until_not_full(unique_lock<mutex>& lk);
|
Chris@102
|
153 inline size_type wait_until_not_full(unique_lock<mutex>& lk, bool&);
|
Chris@102
|
154
|
Chris@102
|
155
|
Chris@102
|
156 inline void notify_not_empty_if_needed(unique_lock<mutex>& lk)
|
Chris@102
|
157 {
|
Chris@102
|
158 if (waiting_empty_ > 0)
|
Chris@102
|
159 {
|
Chris@102
|
160 --waiting_empty_;
|
Chris@102
|
161 lk.unlock();
|
Chris@102
|
162 not_empty_.notify_one();
|
Chris@102
|
163 }
|
Chris@102
|
164 }
|
Chris@102
|
165 inline void notify_not_full_if_needed(unique_lock<mutex>& lk)
|
Chris@102
|
166 {
|
Chris@102
|
167 if (waiting_full_ > 0)
|
Chris@102
|
168 {
|
Chris@102
|
169 --waiting_full_;
|
Chris@102
|
170 lk.unlock();
|
Chris@102
|
171 not_full_.notify_one();
|
Chris@102
|
172 }
|
Chris@102
|
173 }
|
Chris@102
|
174
|
Chris@102
|
175 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
176 inline void pull(value_type& elem, unique_lock<mutex>& lk)
|
Chris@102
|
177 {
|
Chris@102
|
178 elem = boost::move(data_[out_]);
|
Chris@102
|
179 out_ = inc(out_);
|
Chris@102
|
180 notify_not_full_if_needed(lk);
|
Chris@102
|
181 }
|
Chris@102
|
182 inline value_type pull(unique_lock<mutex>& lk)
|
Chris@102
|
183 {
|
Chris@102
|
184 value_type elem = boost::move(data_[out_]);
|
Chris@102
|
185 out_ = inc(out_);
|
Chris@102
|
186 notify_not_full_if_needed(lk);
|
Chris@102
|
187 return boost::move(elem);
|
Chris@102
|
188 }
|
Chris@102
|
189 inline boost::shared_ptr<value_type> ptr_pull(unique_lock<mutex>& lk)
|
Chris@102
|
190 {
|
Chris@102
|
191 shared_ptr<value_type> res = make_shared<value_type>(boost::move(data_[out_]));
|
Chris@102
|
192 out_ = inc(out_);
|
Chris@102
|
193 notify_not_full_if_needed(lk);
|
Chris@102
|
194 return res;
|
Chris@102
|
195 }
|
Chris@102
|
196 #endif
|
Chris@102
|
197 inline void pull_front(value_type& elem, unique_lock<mutex>& lk)
|
Chris@102
|
198 {
|
Chris@102
|
199 elem = boost::move(data_[out_]);
|
Chris@102
|
200 out_ = inc(out_);
|
Chris@102
|
201 notify_not_full_if_needed(lk);
|
Chris@102
|
202 }
|
Chris@102
|
203 inline value_type pull_front(unique_lock<mutex>& lk)
|
Chris@102
|
204 {
|
Chris@102
|
205 value_type elem = boost::move(data_[out_]);
|
Chris@102
|
206 out_ = inc(out_);
|
Chris@102
|
207 notify_not_full_if_needed(lk);
|
Chris@102
|
208 return boost::move(elem);
|
Chris@102
|
209 }
|
Chris@102
|
210
|
Chris@102
|
211 inline void set_in(size_type in, unique_lock<mutex>& lk)
|
Chris@102
|
212 {
|
Chris@102
|
213 in_ = in;
|
Chris@102
|
214 notify_not_empty_if_needed(lk);
|
Chris@102
|
215 }
|
Chris@102
|
216
|
Chris@102
|
217 inline void push_at(const value_type& elem, size_type in_p_1, unique_lock<mutex>& lk)
|
Chris@102
|
218 {
|
Chris@102
|
219 data_[in_] = elem;
|
Chris@102
|
220 set_in(in_p_1, lk);
|
Chris@102
|
221 }
|
Chris@102
|
222
|
Chris@102
|
223 inline void push_at(BOOST_THREAD_RV_REF(value_type) elem, size_type in_p_1, unique_lock<mutex>& lk)
|
Chris@102
|
224 {
|
Chris@102
|
225 data_[in_] = boost::move(elem);
|
Chris@102
|
226 set_in(in_p_1, lk);
|
Chris@102
|
227 }
|
Chris@102
|
228 };
|
Chris@102
|
229
|
Chris@102
|
230 template <typename ValueType>
|
Chris@102
|
231 sync_bounded_queue<ValueType>::sync_bounded_queue(typename sync_bounded_queue<ValueType>::size_type max_elems) :
|
Chris@102
|
232 waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1),
|
Chris@102
|
233 closed_(false)
|
Chris@102
|
234 {
|
Chris@102
|
235 BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1");
|
Chris@102
|
236 }
|
Chris@102
|
237
|
Chris@102
|
238 // template <typename ValueType>
|
Chris@102
|
239 // template <typename Range>
|
Chris@102
|
240 // sync_bounded_queue<ValueType>::sync_bounded_queue(size_type max_elems, Range range) :
|
Chris@102
|
241 // waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1),
|
Chris@102
|
242 // closed_(false)
|
Chris@102
|
243 // {
|
Chris@102
|
244 // BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1");
|
Chris@102
|
245 // BOOST_ASSERT_MSG(max_elems == size(range), "number of elements must match range's size");
|
Chris@102
|
246 // try
|
Chris@102
|
247 // {
|
Chris@102
|
248 // typedef typename Range::iterator iterator_t;
|
Chris@102
|
249 // iterator_t first = boost::begin(range);
|
Chris@102
|
250 // iterator_t end = boost::end(range);
|
Chris@102
|
251 // size_type in = 0;
|
Chris@102
|
252 // for (iterator_t cur = first; cur != end; ++cur, ++in)
|
Chris@102
|
253 // {
|
Chris@102
|
254 // data_[in] = *cur;
|
Chris@102
|
255 // }
|
Chris@102
|
256 // set_in(in);
|
Chris@102
|
257 // }
|
Chris@102
|
258 // catch (...)
|
Chris@102
|
259 // {
|
Chris@102
|
260 // delete[] data_;
|
Chris@102
|
261 // }
|
Chris@102
|
262 // }
|
Chris@102
|
263
|
Chris@102
|
264 template <typename ValueType>
|
Chris@102
|
265 sync_bounded_queue<ValueType>::~sync_bounded_queue()
|
Chris@102
|
266 {
|
Chris@102
|
267 delete[] data_;
|
Chris@102
|
268 }
|
Chris@102
|
269
|
Chris@102
|
270 template <typename ValueType>
|
Chris@102
|
271 void sync_bounded_queue<ValueType>::close()
|
Chris@102
|
272 {
|
Chris@102
|
273 {
|
Chris@102
|
274 lock_guard<mutex> lk(mtx_);
|
Chris@102
|
275 closed_ = true;
|
Chris@102
|
276 }
|
Chris@102
|
277 not_empty_.notify_all();
|
Chris@102
|
278 not_full_.notify_all();
|
Chris@102
|
279 }
|
Chris@102
|
280
|
Chris@102
|
281 template <typename ValueType>
|
Chris@102
|
282 bool sync_bounded_queue<ValueType>::closed() const
|
Chris@102
|
283 {
|
Chris@102
|
284 lock_guard<mutex> lk(mtx_);
|
Chris@102
|
285 return closed_;
|
Chris@102
|
286 }
|
Chris@102
|
287 template <typename ValueType>
|
Chris@102
|
288 bool sync_bounded_queue<ValueType>::closed(unique_lock<mutex>& ) const
|
Chris@102
|
289 {
|
Chris@102
|
290 return closed_;
|
Chris@102
|
291 }
|
Chris@102
|
292
|
Chris@102
|
293 template <typename ValueType>
|
Chris@102
|
294 bool sync_bounded_queue<ValueType>::empty() const
|
Chris@102
|
295 {
|
Chris@102
|
296 lock_guard<mutex> lk(mtx_);
|
Chris@102
|
297 return empty(lk);
|
Chris@102
|
298 }
|
Chris@102
|
299 template <typename ValueType>
|
Chris@102
|
300 bool sync_bounded_queue<ValueType>::full() const
|
Chris@102
|
301 {
|
Chris@102
|
302 lock_guard<mutex> lk(mtx_);
|
Chris@102
|
303 return full(lk);
|
Chris@102
|
304 }
|
Chris@102
|
305
|
Chris@102
|
306 template <typename ValueType>
|
Chris@102
|
307 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::capacity() const
|
Chris@102
|
308 {
|
Chris@102
|
309 lock_guard<mutex> lk(mtx_);
|
Chris@102
|
310 return capacity(lk);
|
Chris@102
|
311 }
|
Chris@102
|
312
|
Chris@102
|
313 template <typename ValueType>
|
Chris@102
|
314 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::size() const
|
Chris@102
|
315 {
|
Chris@102
|
316 lock_guard<mutex> lk(mtx_);
|
Chris@102
|
317 return size(lk);
|
Chris@102
|
318 }
|
Chris@102
|
319
|
Chris@102
|
320 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
321 template <typename ValueType>
|
Chris@102
|
322 bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
323 {
|
Chris@102
|
324 if (empty(lk))
|
Chris@102
|
325 {
|
Chris@102
|
326 throw_if_closed(lk);
|
Chris@102
|
327 return false;
|
Chris@102
|
328 }
|
Chris@102
|
329 pull(elem, lk);
|
Chris@102
|
330 return true;
|
Chris@102
|
331 }
|
Chris@102
|
332 template <typename ValueType>
|
Chris@102
|
333 shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull(unique_lock<mutex>& lk)
|
Chris@102
|
334 {
|
Chris@102
|
335 if (empty(lk))
|
Chris@102
|
336 {
|
Chris@102
|
337 throw_if_closed(lk);
|
Chris@102
|
338 return shared_ptr<ValueType>();
|
Chris@102
|
339 }
|
Chris@102
|
340 return ptr_pull(lk);
|
Chris@102
|
341 }
|
Chris@102
|
342 template <typename ValueType>
|
Chris@102
|
343 bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem)
|
Chris@102
|
344 {
|
Chris@102
|
345 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
346 return try_pull(elem, lk);
|
Chris@102
|
347 }
|
Chris@102
|
348 #endif
|
Chris@102
|
349
|
Chris@102
|
350 template <typename ValueType>
|
Chris@102
|
351 queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
352 {
|
Chris@102
|
353 if (empty(lk))
|
Chris@102
|
354 {
|
Chris@102
|
355 if (closed(lk)) return queue_op_status::closed;
|
Chris@102
|
356 return queue_op_status::empty;
|
Chris@102
|
357 }
|
Chris@102
|
358 pull_front(elem, lk);
|
Chris@102
|
359 return queue_op_status::success;
|
Chris@102
|
360 }
|
Chris@102
|
361
|
Chris@102
|
362 template <typename ValueType>
|
Chris@102
|
363 queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem)
|
Chris@102
|
364 {
|
Chris@102
|
365 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
366 return try_pull_front(elem, lk);
|
Chris@102
|
367 }
|
Chris@102
|
368
|
Chris@102
|
369 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
370 template <typename ValueType>
|
Chris@102
|
371 bool sync_bounded_queue<ValueType>::try_pull(no_block_tag,ValueType& elem)
|
Chris@102
|
372 {
|
Chris@102
|
373 unique_lock<mutex> lk(mtx_, try_to_lock);
|
Chris@102
|
374 if (!lk.owns_lock())
|
Chris@102
|
375 {
|
Chris@102
|
376 return false;
|
Chris@102
|
377 }
|
Chris@102
|
378 return try_pull(elem, lk);
|
Chris@102
|
379 }
|
Chris@102
|
380 template <typename ValueType>
|
Chris@102
|
381 boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull()
|
Chris@102
|
382 {
|
Chris@102
|
383 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
384 return try_pull(lk);
|
Chris@102
|
385 }
|
Chris@102
|
386 #endif
|
Chris@102
|
387
|
Chris@102
|
388 template <typename ValueType>
|
Chris@102
|
389 queue_op_status sync_bounded_queue<ValueType>::nonblocking_pull_front(ValueType& elem)
|
Chris@102
|
390 {
|
Chris@102
|
391 unique_lock<mutex> lk(mtx_, try_to_lock);
|
Chris@102
|
392 if (!lk.owns_lock())
|
Chris@102
|
393 {
|
Chris@102
|
394 return queue_op_status::busy;
|
Chris@102
|
395 }
|
Chris@102
|
396 return try_pull_front(elem, lk);
|
Chris@102
|
397 }
|
Chris@102
|
398
|
Chris@102
|
399 template <typename ValueType>
|
Chris@102
|
400 void sync_bounded_queue<ValueType>::throw_if_closed(unique_lock<mutex>&)
|
Chris@102
|
401 {
|
Chris@102
|
402 if (closed_)
|
Chris@102
|
403 {
|
Chris@102
|
404 BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
|
Chris@102
|
405 }
|
Chris@102
|
406 }
|
Chris@102
|
407
|
Chris@102
|
408 template <typename ValueType>
|
Chris@102
|
409 void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk)
|
Chris@102
|
410 {
|
Chris@102
|
411 for (;;)
|
Chris@102
|
412 {
|
Chris@102
|
413 if (out_ != in_) break;
|
Chris@102
|
414 throw_if_closed(lk);
|
Chris@102
|
415 ++waiting_empty_;
|
Chris@102
|
416 not_empty_.wait(lk);
|
Chris@102
|
417 }
|
Chris@102
|
418 }
|
Chris@102
|
419 template <typename ValueType>
|
Chris@102
|
420 void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk, bool & closed)
|
Chris@102
|
421 {
|
Chris@102
|
422 for (;;)
|
Chris@102
|
423 {
|
Chris@102
|
424 if (out_ != in_) break;
|
Chris@102
|
425 if (closed_) {closed=true; return;}
|
Chris@102
|
426 ++waiting_empty_;
|
Chris@102
|
427 not_empty_.wait(lk);
|
Chris@102
|
428 }
|
Chris@102
|
429 }
|
Chris@102
|
430
|
Chris@102
|
431 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
432 template <typename ValueType>
|
Chris@102
|
433 void sync_bounded_queue<ValueType>::pull(ValueType& elem)
|
Chris@102
|
434 {
|
Chris@102
|
435 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
436 wait_until_not_empty(lk);
|
Chris@102
|
437 pull(elem, lk);
|
Chris@102
|
438 }
|
Chris@102
|
439 // template <typename ValueType>
|
Chris@102
|
440 // void sync_bounded_queue<ValueType>::pull(ValueType& elem, bool & closed)
|
Chris@102
|
441 // {
|
Chris@102
|
442 // unique_lock<mutex> lk(mtx_);
|
Chris@102
|
443 // wait_until_not_empty(lk, closed);
|
Chris@102
|
444 // if (closed) {return;}
|
Chris@102
|
445 // pull(elem, lk);
|
Chris@102
|
446 // }
|
Chris@102
|
447
|
Chris@102
|
448 // enable if ValueType is nothrow movable
|
Chris@102
|
449 template <typename ValueType>
|
Chris@102
|
450 ValueType sync_bounded_queue<ValueType>::pull()
|
Chris@102
|
451 {
|
Chris@102
|
452 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
453 wait_until_not_empty(lk);
|
Chris@102
|
454 return pull(lk);
|
Chris@102
|
455 }
|
Chris@102
|
456 template <typename ValueType>
|
Chris@102
|
457 boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::ptr_pull()
|
Chris@102
|
458 {
|
Chris@102
|
459 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
460 wait_until_not_empty(lk);
|
Chris@102
|
461 return ptr_pull(lk);
|
Chris@102
|
462 }
|
Chris@102
|
463
|
Chris@102
|
464 #endif
|
Chris@102
|
465
|
Chris@102
|
466 template <typename ValueType>
|
Chris@102
|
467 void sync_bounded_queue<ValueType>::pull_front(ValueType& elem)
|
Chris@102
|
468 {
|
Chris@102
|
469 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
470 wait_until_not_empty(lk);
|
Chris@102
|
471 pull_front(elem, lk);
|
Chris@102
|
472 }
|
Chris@102
|
473
|
Chris@102
|
474 // enable if ValueType is nothrow movable
|
Chris@102
|
475 template <typename ValueType>
|
Chris@102
|
476 ValueType sync_bounded_queue<ValueType>::pull_front()
|
Chris@102
|
477 {
|
Chris@102
|
478 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
479 wait_until_not_empty(lk);
|
Chris@102
|
480 return pull_front(lk);
|
Chris@102
|
481 }
|
Chris@102
|
482
|
Chris@102
|
483 template <typename ValueType>
|
Chris@102
|
484 queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
485 {
|
Chris@102
|
486 if (empty(lk) && closed(lk)) {return queue_op_status::closed;}
|
Chris@102
|
487 wait_until_not_empty(lk);
|
Chris@102
|
488 pull_front(elem, lk);
|
Chris@102
|
489 return queue_op_status::success;
|
Chris@102
|
490 }
|
Chris@102
|
491 template <typename ValueType>
|
Chris@102
|
492 queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem)
|
Chris@102
|
493 {
|
Chris@102
|
494 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
495 return wait_pull_front(elem, lk);
|
Chris@102
|
496 }
|
Chris@102
|
497
|
Chris@102
|
498 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
499 template <typename ValueType>
|
Chris@102
|
500 bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
501 {
|
Chris@102
|
502 throw_if_closed(lk);
|
Chris@102
|
503 size_type in_p_1 = inc(in_);
|
Chris@102
|
504 if (in_p_1 == out_) // full()
|
Chris@102
|
505 {
|
Chris@102
|
506 return false;
|
Chris@102
|
507 }
|
Chris@102
|
508 push_at(elem, in_p_1, lk);
|
Chris@102
|
509 return true;
|
Chris@102
|
510 }
|
Chris@102
|
511 template <typename ValueType>
|
Chris@102
|
512 bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem)
|
Chris@102
|
513 {
|
Chris@102
|
514 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
515 return try_push(elem, lk);
|
Chris@102
|
516 }
|
Chris@102
|
517
|
Chris@102
|
518 #endif
|
Chris@102
|
519
|
Chris@102
|
520 template <typename ValueType>
|
Chris@102
|
521 queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
522 {
|
Chris@102
|
523 if (closed(lk)) return queue_op_status::closed;
|
Chris@102
|
524 size_type in_p_1 = inc(in_);
|
Chris@102
|
525 if (in_p_1 == out_) // full()
|
Chris@102
|
526 {
|
Chris@102
|
527 return queue_op_status::full;
|
Chris@102
|
528 }
|
Chris@102
|
529 push_at(elem, in_p_1, lk);
|
Chris@102
|
530 return queue_op_status::success;
|
Chris@102
|
531 }
|
Chris@102
|
532
|
Chris@102
|
533 template <typename ValueType>
|
Chris@102
|
534 queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem)
|
Chris@102
|
535 {
|
Chris@102
|
536 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
537 return try_push_back(elem, lk);
|
Chris@102
|
538 }
|
Chris@102
|
539
|
Chris@102
|
540 template <typename ValueType>
|
Chris@102
|
541 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
542 {
|
Chris@102
|
543 if (closed(lk)) return queue_op_status::closed;
|
Chris@102
|
544 push_at(elem, wait_until_not_full(lk), lk);
|
Chris@102
|
545 return queue_op_status::success;
|
Chris@102
|
546 }
|
Chris@102
|
547 template <typename ValueType>
|
Chris@102
|
548 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem)
|
Chris@102
|
549 {
|
Chris@102
|
550 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
551 return wait_push_back(elem, lk);
|
Chris@102
|
552 }
|
Chris@102
|
553
|
Chris@102
|
554
|
Chris@102
|
555 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
556 template <typename ValueType>
|
Chris@102
|
557 bool sync_bounded_queue<ValueType>::try_push(no_block_tag, const ValueType& elem)
|
Chris@102
|
558 {
|
Chris@102
|
559 unique_lock<mutex> lk(mtx_, try_to_lock);
|
Chris@102
|
560 if (!lk.owns_lock()) return false;
|
Chris@102
|
561 return try_push(elem, lk);
|
Chris@102
|
562 }
|
Chris@102
|
563 #endif
|
Chris@102
|
564
|
Chris@102
|
565 template <typename ValueType>
|
Chris@102
|
566 queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(const ValueType& elem)
|
Chris@102
|
567 {
|
Chris@102
|
568 unique_lock<mutex> lk(mtx_, try_to_lock);
|
Chris@102
|
569 if (!lk.owns_lock()) return queue_op_status::busy;
|
Chris@102
|
570 return try_push_back(elem, lk);
|
Chris@102
|
571 }
|
Chris@102
|
572
|
Chris@102
|
573 template <typename ValueType>
|
Chris@102
|
574 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::wait_until_not_full(unique_lock<mutex>& lk)
|
Chris@102
|
575 {
|
Chris@102
|
576 for (;;)
|
Chris@102
|
577 {
|
Chris@102
|
578 throw_if_closed(lk);
|
Chris@102
|
579 size_type in_p_1 = inc(in_);
|
Chris@102
|
580 if (in_p_1 != out_) // ! full()
|
Chris@102
|
581 {
|
Chris@102
|
582 return in_p_1;
|
Chris@102
|
583 }
|
Chris@102
|
584 ++waiting_full_;
|
Chris@102
|
585 not_full_.wait(lk);
|
Chris@102
|
586 }
|
Chris@102
|
587 }
|
Chris@102
|
588
|
Chris@102
|
589 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
590 template <typename ValueType>
|
Chris@102
|
591 void sync_bounded_queue<ValueType>::push(const ValueType& elem)
|
Chris@102
|
592 {
|
Chris@102
|
593 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
594 push_at(elem, wait_until_not_full(lk), lk);
|
Chris@102
|
595 }
|
Chris@102
|
596 #endif
|
Chris@102
|
597 template <typename ValueType>
|
Chris@102
|
598 void sync_bounded_queue<ValueType>::push_back(const ValueType& elem)
|
Chris@102
|
599 {
|
Chris@102
|
600 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
601 push_at(elem, wait_until_not_full(lk), lk);
|
Chris@102
|
602 }
|
Chris@102
|
603
|
Chris@102
|
604 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
605 template <typename ValueType>
|
Chris@102
|
606 bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
|
Chris@102
|
607 {
|
Chris@102
|
608 throw_if_closed(lk);
|
Chris@102
|
609 size_type in_p_1 = inc(in_);
|
Chris@102
|
610 if (in_p_1 == out_) // full()
|
Chris@102
|
611 {
|
Chris@102
|
612 return false;
|
Chris@102
|
613 }
|
Chris@102
|
614 push_at(boost::move(elem), in_p_1, lk);
|
Chris@102
|
615 return true;
|
Chris@102
|
616 }
|
Chris@102
|
617
|
Chris@102
|
618 template <typename ValueType>
|
Chris@102
|
619 bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
620 {
|
Chris@102
|
621 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
622 return try_push(boost::move(elem), lk);
|
Chris@102
|
623 }
|
Chris@102
|
624 #endif
|
Chris@102
|
625
|
Chris@102
|
626 template <typename ValueType>
|
Chris@102
|
627 queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
|
Chris@102
|
628 {
|
Chris@102
|
629 if (closed(lk)) return queue_op_status::closed;
|
Chris@102
|
630 size_type in_p_1 = inc(in_);
|
Chris@102
|
631 if (in_p_1 == out_) // full()
|
Chris@102
|
632 {
|
Chris@102
|
633 return queue_op_status::full;
|
Chris@102
|
634 }
|
Chris@102
|
635 push_at(boost::move(elem), in_p_1, lk);
|
Chris@102
|
636 return queue_op_status::success;
|
Chris@102
|
637 }
|
Chris@102
|
638 template <typename ValueType>
|
Chris@102
|
639 queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
640 {
|
Chris@102
|
641 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
642 return try_push_back(boost::move(elem), lk);
|
Chris@102
|
643 }
|
Chris@102
|
644
|
Chris@102
|
645 template <typename ValueType>
|
Chris@102
|
646 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
|
Chris@102
|
647 {
|
Chris@102
|
648 if (closed(lk)) return queue_op_status::closed;
|
Chris@102
|
649 push_at(boost::move(elem), wait_until_not_full(lk), lk);
|
Chris@102
|
650 return queue_op_status::success;
|
Chris@102
|
651 }
|
Chris@102
|
652 template <typename ValueType>
|
Chris@102
|
653 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
654 {
|
Chris@102
|
655 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
656 return try_push_back(boost::move(elem), lk);
|
Chris@102
|
657 }
|
Chris@102
|
658
|
Chris@102
|
659
|
Chris@102
|
660 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
661 template <typename ValueType>
|
Chris@102
|
662 bool sync_bounded_queue<ValueType>::try_push(no_block_tag, BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
663 {
|
Chris@102
|
664 unique_lock<mutex> lk(mtx_, try_to_lock);
|
Chris@102
|
665 if (!lk.owns_lock())
|
Chris@102
|
666 {
|
Chris@102
|
667 return false;
|
Chris@102
|
668 }
|
Chris@102
|
669 return try_push(boost::move(elem), lk);
|
Chris@102
|
670 }
|
Chris@102
|
671 #endif
|
Chris@102
|
672 template <typename ValueType>
|
Chris@102
|
673 queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
674 {
|
Chris@102
|
675 unique_lock<mutex> lk(mtx_, try_to_lock);
|
Chris@102
|
676 if (!lk.owns_lock())
|
Chris@102
|
677 {
|
Chris@102
|
678 return queue_op_status::busy;
|
Chris@102
|
679 }
|
Chris@102
|
680 return try_push_back(boost::move(elem), lk);
|
Chris@102
|
681 }
|
Chris@102
|
682
|
Chris@102
|
683 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
|
Chris@102
|
684 template <typename ValueType>
|
Chris@102
|
685 void sync_bounded_queue<ValueType>::push(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
686 {
|
Chris@102
|
687 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
688 push_at(boost::move(elem), wait_until_not_full(lk), lk);
|
Chris@102
|
689 }
|
Chris@102
|
690 #endif
|
Chris@102
|
691 template <typename ValueType>
|
Chris@102
|
692 void sync_bounded_queue<ValueType>::push_back(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
693 {
|
Chris@102
|
694 unique_lock<mutex> lk(mtx_);
|
Chris@102
|
695 push_at(boost::move(elem), wait_until_not_full(lk), lk);
|
Chris@102
|
696 }
|
Chris@102
|
697
|
Chris@102
|
698 template <typename ValueType>
|
Chris@102
|
699 sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
700 {
|
Chris@102
|
701 sbq.push_back(boost::move(elem));
|
Chris@102
|
702 return sbq;
|
Chris@102
|
703 }
|
Chris@102
|
704
|
Chris@102
|
705 template <typename ValueType>
|
Chris@102
|
706 sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType const&elem)
|
Chris@102
|
707 {
|
Chris@102
|
708 sbq.push_back(elem);
|
Chris@102
|
709 return sbq;
|
Chris@102
|
710 }
|
Chris@102
|
711
|
Chris@102
|
712 template <typename ValueType>
|
Chris@102
|
713 sync_bounded_queue<ValueType>& operator>>(sync_bounded_queue<ValueType>& sbq, ValueType &elem)
|
Chris@102
|
714 {
|
Chris@102
|
715 sbq.pull_front(elem);
|
Chris@102
|
716 return sbq;
|
Chris@102
|
717 }
|
Chris@102
|
718 }
|
Chris@102
|
719 using concurrent::sync_bounded_queue;
|
Chris@102
|
720
|
Chris@102
|
721 }
|
Chris@102
|
722
|
Chris@102
|
723 #include <boost/config/abi_suffix.hpp>
|
Chris@102
|
724
|
Chris@102
|
725 #endif
|