Chris@102
|
1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
|
Chris@102
|
2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
|
Chris@102
|
3
|
Chris@102
|
4 //////////////////////////////////////////////////////////////////////////////
|
Chris@102
|
5 //
|
Chris@102
|
6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
|
Chris@102
|
7 // Software License, Version 1.0. (See accompanying file
|
Chris@102
|
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
Chris@102
|
9 //
|
Chris@102
|
10 // See http://www.boost.org/libs/thread for documentation.
|
Chris@102
|
11 //
|
Chris@102
|
12 //////////////////////////////////////////////////////////////////////////////
|
Chris@102
|
13
|
Chris@102
|
14 #include <boost/thread/detail/config.hpp>
|
Chris@102
|
15 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
|
Chris@102
|
16 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
|
Chris@102
|
17 #include <boost/thread/condition_variable.hpp>
|
Chris@102
|
18 #include <boost/thread/csbl/devector.hpp>
|
Chris@102
|
19 #include <boost/thread/detail/move.hpp>
|
Chris@102
|
20 #include <boost/thread/mutex.hpp>
|
Chris@102
|
21
|
Chris@102
|
22 #include <boost/throw_exception.hpp>
|
Chris@102
|
23 #include <boost/smart_ptr/shared_ptr.hpp>
|
Chris@102
|
24 #include <boost/smart_ptr/make_shared.hpp>
|
Chris@102
|
25
|
Chris@102
|
26 #include <boost/config/abi_prefix.hpp>
|
Chris@102
|
27
|
Chris@102
|
28 namespace boost
|
Chris@102
|
29 {
|
Chris@102
|
30 namespace concurrent
|
Chris@102
|
31 {
|
Chris@102
|
32 template <class ValueType, class Container = csbl::devector<ValueType> >
|
Chris@102
|
33 class sync_queue
|
Chris@102
|
34 : public detail::sync_queue_base<ValueType, Container >
|
Chris@102
|
35 {
|
Chris@102
|
36 typedef detail::sync_queue_base<ValueType, Container > super;
|
Chris@102
|
37
|
Chris@102
|
38 public:
|
Chris@102
|
39 typedef ValueType value_type;
|
Chris@102
|
40 //typedef typename super::value_type value_type; // fixme
|
Chris@102
|
41 typedef typename super::underlying_queue_type underlying_queue_type;
|
Chris@102
|
42 typedef typename super::size_type size_type;
|
Chris@102
|
43 typedef typename super::op_status op_status;
|
Chris@102
|
44
|
Chris@102
|
45 // Constructors/Assignment/Destructors
|
Chris@102
|
46 BOOST_THREAD_NO_COPYABLE(sync_queue)
|
Chris@102
|
47 inline sync_queue();
|
Chris@102
|
48 //template <class Range>
|
Chris@102
|
49 //inline explicit sync_queue(Range range);
|
Chris@102
|
50 inline ~sync_queue();
|
Chris@102
|
51
|
Chris@102
|
52 // Modifiers
|
Chris@102
|
53
|
Chris@102
|
54 inline void push(const value_type& x);
|
Chris@102
|
55 inline queue_op_status try_push(const value_type& x);
|
Chris@102
|
56 inline queue_op_status nonblocking_push(const value_type& x);
|
Chris@102
|
57 inline queue_op_status wait_push(const value_type& x);
|
Chris@102
|
58 inline void push(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
59 inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
60 inline queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
61 inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x);
|
Chris@102
|
62
|
Chris@102
|
63 // Observers/Modifiers
|
Chris@102
|
64 inline void pull(value_type&);
|
Chris@102
|
65 // enable_if is_nothrow_copy_movable<value_type>
|
Chris@102
|
66 inline value_type pull();
|
Chris@102
|
67
|
Chris@102
|
68 inline queue_op_status try_pull(value_type&);
|
Chris@102
|
69 inline queue_op_status nonblocking_pull(value_type&);
|
Chris@102
|
70 inline queue_op_status wait_pull(ValueType& elem);
|
Chris@102
|
71
|
Chris@102
|
72 private:
|
Chris@102
|
73
|
Chris@102
|
74 inline queue_op_status try_pull(value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
75 inline queue_op_status wait_pull(value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
76 inline queue_op_status try_push(const value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
77 inline queue_op_status wait_push(const value_type& x, unique_lock<mutex>& lk);
|
Chris@102
|
78 inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
|
Chris@102
|
79 inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
|
Chris@102
|
80
|
Chris@102
|
81 inline void pull(value_type& elem, unique_lock<mutex>& )
|
Chris@102
|
82 {
|
Chris@102
|
83 elem = boost::move(super::data_.front());
|
Chris@102
|
84 super::data_.pop_front();
|
Chris@102
|
85 }
|
Chris@102
|
86 inline value_type pull(unique_lock<mutex>& )
|
Chris@102
|
87 {
|
Chris@102
|
88 value_type e = boost::move(super::data_.front());
|
Chris@102
|
89 super::data_.pop_front();
|
Chris@102
|
90 return boost::move(e);
|
Chris@102
|
91 }
|
Chris@102
|
92
|
Chris@102
|
93 inline void push(const value_type& elem, unique_lock<mutex>& lk)
|
Chris@102
|
94 {
|
Chris@102
|
95 super::data_.push_back(elem);
|
Chris@102
|
96 super::notify_not_empty_if_needed(lk);
|
Chris@102
|
97 }
|
Chris@102
|
98
|
Chris@102
|
99 inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk)
|
Chris@102
|
100 {
|
Chris@102
|
101 super::data_.push_back(boost::move(elem));
|
Chris@102
|
102 super::notify_not_empty_if_needed(lk);
|
Chris@102
|
103 }
|
Chris@102
|
104 };
|
Chris@102
|
105
|
Chris@102
|
106 template <class ValueType, class Container>
|
Chris@102
|
107 sync_queue<ValueType, Container>::sync_queue() :
|
Chris@102
|
108 super()
|
Chris@102
|
109 {
|
Chris@102
|
110 }
|
Chris@102
|
111
|
Chris@102
|
112 // template <class ValueType, class Container>
|
Chris@102
|
113 // template <class Range>
|
Chris@102
|
114 // explicit sync_queue<ValueType, Container>::sync_queue(Range range) :
|
Chris@102
|
115 // data_(), closed_(false)
|
Chris@102
|
116 // {
|
Chris@102
|
117 // try
|
Chris@102
|
118 // {
|
Chris@102
|
119 // typedef typename Range::iterator iterator_t;
|
Chris@102
|
120 // iterator_t first = boost::begin(range);
|
Chris@102
|
121 // iterator_t end = boost::end(range);
|
Chris@102
|
122 // for (iterator_t cur = first; cur != end; ++cur)
|
Chris@102
|
123 // {
|
Chris@102
|
124 // data_.push(boost::move(*cur));;
|
Chris@102
|
125 // }
|
Chris@102
|
126 // notify_not_empty_if_needed(lk);
|
Chris@102
|
127 // }
|
Chris@102
|
128 // catch (...)
|
Chris@102
|
129 // {
|
Chris@102
|
130 // delete[] data_;
|
Chris@102
|
131 // }
|
Chris@102
|
132 // }
|
Chris@102
|
133
|
Chris@102
|
134 template <class ValueType, class Container>
|
Chris@102
|
135 sync_queue<ValueType, Container>::~sync_queue()
|
Chris@102
|
136 {
|
Chris@102
|
137 }
|
Chris@102
|
138
|
Chris@102
|
139 template <class ValueType, class Container>
|
Chris@102
|
140 queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
141 {
|
Chris@102
|
142 if (super::empty(lk))
|
Chris@102
|
143 {
|
Chris@102
|
144 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
145 return queue_op_status::empty;
|
Chris@102
|
146 }
|
Chris@102
|
147 pull(elem, lk);
|
Chris@102
|
148 return queue_op_status::success;
|
Chris@102
|
149 }
|
Chris@102
|
150 template <class ValueType, class Container>
|
Chris@102
|
151 queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
152 {
|
Chris@102
|
153 if (super::empty(lk))
|
Chris@102
|
154 {
|
Chris@102
|
155 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
156 }
|
Chris@102
|
157 bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
|
Chris@102
|
158 if (has_been_closed) return queue_op_status::closed;
|
Chris@102
|
159 pull(elem, lk);
|
Chris@102
|
160 return queue_op_status::success;
|
Chris@102
|
161 }
|
Chris@102
|
162
|
Chris@102
|
163 template <class ValueType, class Container>
|
Chris@102
|
164 queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem)
|
Chris@102
|
165 {
|
Chris@102
|
166 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
167 return try_pull(elem, lk);
|
Chris@102
|
168 }
|
Chris@102
|
169
|
Chris@102
|
170 template <class ValueType, class Container>
|
Chris@102
|
171 queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem)
|
Chris@102
|
172 {
|
Chris@102
|
173 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
174 return wait_pull(elem, lk);
|
Chris@102
|
175 }
|
Chris@102
|
176
|
Chris@102
|
177 template <class ValueType, class Container>
|
Chris@102
|
178 queue_op_status sync_queue<ValueType, Container>::nonblocking_pull(ValueType& elem)
|
Chris@102
|
179 {
|
Chris@102
|
180 unique_lock<mutex> lk(super::mtx_, try_to_lock);
|
Chris@102
|
181 if (!lk.owns_lock())
|
Chris@102
|
182 {
|
Chris@102
|
183 return queue_op_status::busy;
|
Chris@102
|
184 }
|
Chris@102
|
185 return try_pull(elem, lk);
|
Chris@102
|
186 }
|
Chris@102
|
187
|
Chris@102
|
188 template <class ValueType, class Container>
|
Chris@102
|
189 void sync_queue<ValueType, Container>::pull(ValueType& elem)
|
Chris@102
|
190 {
|
Chris@102
|
191 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
192 super::wait_until_not_empty(lk);
|
Chris@102
|
193 pull(elem, lk);
|
Chris@102
|
194 }
|
Chris@102
|
195
|
Chris@102
|
196 // enable if ValueType is nothrow movable
|
Chris@102
|
197 template <class ValueType, class Container>
|
Chris@102
|
198 ValueType sync_queue<ValueType, Container>::pull()
|
Chris@102
|
199 {
|
Chris@102
|
200 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
201 super::wait_until_not_empty(lk);
|
Chris@102
|
202 return pull(lk);
|
Chris@102
|
203 }
|
Chris@102
|
204
|
Chris@102
|
205 template <class ValueType, class Container>
|
Chris@102
|
206 queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
207 {
|
Chris@102
|
208 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
209 push(elem, lk);
|
Chris@102
|
210 return queue_op_status::success;
|
Chris@102
|
211 }
|
Chris@102
|
212
|
Chris@102
|
213 template <class ValueType, class Container>
|
Chris@102
|
214 queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem)
|
Chris@102
|
215 {
|
Chris@102
|
216 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
217 return try_push(elem, lk);
|
Chris@102
|
218 }
|
Chris@102
|
219
|
Chris@102
|
220 template <class ValueType, class Container>
|
Chris@102
|
221 queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem, unique_lock<mutex>& lk)
|
Chris@102
|
222 {
|
Chris@102
|
223 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
224 push(elem, lk);
|
Chris@102
|
225 return queue_op_status::success;
|
Chris@102
|
226 }
|
Chris@102
|
227
|
Chris@102
|
228 template <class ValueType, class Container>
|
Chris@102
|
229 queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem)
|
Chris@102
|
230 {
|
Chris@102
|
231 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
232 return wait_push(elem, lk);
|
Chris@102
|
233 }
|
Chris@102
|
234
|
Chris@102
|
235 template <class ValueType, class Container>
|
Chris@102
|
236 queue_op_status sync_queue<ValueType, Container>::nonblocking_push(const ValueType& elem)
|
Chris@102
|
237 {
|
Chris@102
|
238 unique_lock<mutex> lk(super::mtx_, try_to_lock);
|
Chris@102
|
239 if (!lk.owns_lock()) return queue_op_status::busy;
|
Chris@102
|
240 return try_push(elem, lk);
|
Chris@102
|
241 }
|
Chris@102
|
242
|
Chris@102
|
243 template <class ValueType, class Container>
|
Chris@102
|
244 void sync_queue<ValueType, Container>::push(const ValueType& elem)
|
Chris@102
|
245 {
|
Chris@102
|
246 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
247 super::throw_if_closed(lk);
|
Chris@102
|
248 push(elem, lk);
|
Chris@102
|
249 }
|
Chris@102
|
250
|
Chris@102
|
251 template <class ValueType, class Container>
|
Chris@102
|
252 queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
|
Chris@102
|
253 {
|
Chris@102
|
254 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
255 push(boost::move(elem), lk);
|
Chris@102
|
256 return queue_op_status::success;
|
Chris@102
|
257 }
|
Chris@102
|
258
|
Chris@102
|
259 template <class ValueType, class Container>
|
Chris@102
|
260 queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
261 {
|
Chris@102
|
262 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
263 return try_push(boost::move(elem), lk);
|
Chris@102
|
264 }
|
Chris@102
|
265
|
Chris@102
|
266 template <class ValueType, class Container>
|
Chris@102
|
267 queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
|
Chris@102
|
268 {
|
Chris@102
|
269 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
270 push(boost::move(elem), lk);
|
Chris@102
|
271 return queue_op_status::success;
|
Chris@102
|
272 }
|
Chris@102
|
273
|
Chris@102
|
274 template <class ValueType, class Container>
|
Chris@102
|
275 queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
276 {
|
Chris@102
|
277 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
278 return wait_push(boost::move(elem), lk);
|
Chris@102
|
279 }
|
Chris@102
|
280
|
Chris@102
|
281 template <class ValueType, class Container>
|
Chris@102
|
282 queue_op_status sync_queue<ValueType, Container>::nonblocking_push(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
283 {
|
Chris@102
|
284 unique_lock<mutex> lk(super::mtx_, try_to_lock);
|
Chris@102
|
285 if (!lk.owns_lock())
|
Chris@102
|
286 {
|
Chris@102
|
287 return queue_op_status::busy;
|
Chris@102
|
288 }
|
Chris@102
|
289 return try_push(boost::move(elem), lk);
|
Chris@102
|
290 }
|
Chris@102
|
291
|
Chris@102
|
292 template <class ValueType, class Container>
|
Chris@102
|
293 void sync_queue<ValueType, Container>::push(BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
294 {
|
Chris@102
|
295 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
296 super::throw_if_closed(lk);
|
Chris@102
|
297 push(boost::move(elem), lk);
|
Chris@102
|
298 }
|
Chris@102
|
299
|
Chris@102
|
300 template <class ValueType, class Container>
|
Chris@102
|
301 sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
|
Chris@102
|
302 {
|
Chris@102
|
303 sbq.push(boost::move(elem));
|
Chris@102
|
304 return sbq;
|
Chris@102
|
305 }
|
Chris@102
|
306
|
Chris@102
|
307 template <class ValueType, class Container>
|
Chris@102
|
308 sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, ValueType const&elem)
|
Chris@102
|
309 {
|
Chris@102
|
310 sbq.push(elem);
|
Chris@102
|
311 return sbq;
|
Chris@102
|
312 }
|
Chris@102
|
313
|
Chris@102
|
314 template <class ValueType, class Container>
|
Chris@102
|
315 sync_queue<ValueType, Container>& operator>>(sync_queue<ValueType, Container>& sbq, ValueType &elem)
|
Chris@102
|
316 {
|
Chris@102
|
317 sbq.pull(elem);
|
Chris@102
|
318 return sbq;
|
Chris@102
|
319 }
|
Chris@102
|
320
|
Chris@102
|
321 }
|
Chris@102
|
322 using concurrent::sync_queue;
|
Chris@102
|
323
|
Chris@102
|
324 }
|
Chris@102
|
325
|
Chris@102
|
326 #include <boost/config/abi_suffix.hpp>
|
Chris@102
|
327
|
Chris@102
|
328 #endif
|