Chris@102
|
1 // Copyright (C) 2014 Ian Forbed
|
Chris@102
|
2 // Copyright (C) 2014 Vicente J. Botet Escriba
|
Chris@102
|
3 //
|
Chris@102
|
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
|
Chris@102
|
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
Chris@102
|
6 //
|
Chris@102
|
7
|
Chris@102
|
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
|
Chris@102
|
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
|
Chris@102
|
10
|
Chris@102
|
11 #include <boost/thread/detail/config.hpp>
|
Chris@102
|
12
|
Chris@102
|
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
|
Chris@102
|
14 #include <boost/chrono/duration.hpp>
|
Chris@102
|
15 #include <boost/chrono/time_point.hpp>
|
Chris@102
|
16 #include <boost/chrono/system_clocks.hpp>
|
Chris@102
|
17 #include <boost/chrono/chrono_io.hpp>
|
Chris@102
|
18
|
Chris@102
|
19 #include <boost/config/abi_prefix.hpp>
|
Chris@102
|
20
|
Chris@102
|
21 namespace boost
|
Chris@102
|
22 {
|
Chris@102
|
23 namespace concurrent
|
Chris@102
|
24 {
|
Chris@102
|
25 namespace detail
|
Chris@102
|
26 {
|
Chris@102
|
27 template <class T, class Clock = chrono::steady_clock>
|
Chris@102
|
28 struct scheduled_type
|
Chris@102
|
29 {
|
Chris@102
|
30 typedef T value_type;
|
Chris@102
|
31 typedef Clock clock;
|
Chris@102
|
32 typedef typename clock::time_point time_point;
|
Chris@102
|
33 T data;
|
Chris@102
|
34 time_point time;
|
Chris@102
|
35
|
Chris@102
|
36 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
|
Chris@102
|
37
|
Chris@102
|
38 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
|
Chris@102
|
39 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
|
Chris@102
|
40
|
Chris@102
|
41 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
|
Chris@102
|
42 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
|
Chris@102
|
43 data = other.data;
|
Chris@102
|
44 time = other.time;
|
Chris@102
|
45 return *this;
|
Chris@102
|
46 }
|
Chris@102
|
47
|
Chris@102
|
48 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
|
Chris@102
|
49 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
|
Chris@102
|
50 data = boost::move(other.data);
|
Chris@102
|
51 time = other.time;
|
Chris@102
|
52 return *this;
|
Chris@102
|
53 }
|
Chris@102
|
54
|
Chris@102
|
55 bool time_not_reached() const
|
Chris@102
|
56 {
|
Chris@102
|
57 return time > clock::now();
|
Chris@102
|
58 }
|
Chris@102
|
59
|
Chris@102
|
60 bool operator <(const scheduled_type<T> other) const
|
Chris@102
|
61 {
|
Chris@102
|
62 return this->time > other.time;
|
Chris@102
|
63 }
|
Chris@102
|
64 }; //end struct
|
Chris@102
|
65
|
Chris@102
|
66 } //end detail namespace
|
Chris@102
|
67
|
Chris@102
|
68 template <class T, class Clock = chrono::steady_clock>
|
Chris@102
|
69 class sync_timed_queue
|
Chris@102
|
70 : private sync_priority_queue<detail::scheduled_type<T, Clock> >
|
Chris@102
|
71 {
|
Chris@102
|
72 typedef detail::scheduled_type<T> stype;
|
Chris@102
|
73 typedef sync_priority_queue<stype> super;
|
Chris@102
|
74 public:
|
Chris@102
|
75 typedef T value_type;
|
Chris@102
|
76 typedef Clock clock;
|
Chris@102
|
77 typedef typename clock::duration duration;
|
Chris@102
|
78 typedef typename clock::time_point time_point;
|
Chris@102
|
79 typedef typename super::underlying_queue_type underlying_queue_type;
|
Chris@102
|
80 typedef typename super::size_type size_type;
|
Chris@102
|
81 typedef typename super::op_status op_status;
|
Chris@102
|
82
|
Chris@102
|
83 sync_timed_queue() : super() {};
|
Chris@102
|
84 ~sync_timed_queue() {}
|
Chris@102
|
85
|
Chris@102
|
86 using super::size;
|
Chris@102
|
87 using super::empty;
|
Chris@102
|
88 using super::full;
|
Chris@102
|
89 using super::close;
|
Chris@102
|
90 using super::closed;
|
Chris@102
|
91
|
Chris@102
|
92 T pull();
|
Chris@102
|
93 void pull(T& elem);
|
Chris@102
|
94
|
Chris@102
|
95 template <class Duration>
|
Chris@102
|
96 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
|
Chris@102
|
97 template <class Rep, class Period>
|
Chris@102
|
98 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
|
Chris@102
|
99
|
Chris@102
|
100 queue_op_status try_pull(T& elem);
|
Chris@102
|
101 queue_op_status wait_pull(T& elem);
|
Chris@102
|
102 queue_op_status nonblocking_pull(T& elem);
|
Chris@102
|
103
|
Chris@102
|
104 template <class Duration>
|
Chris@102
|
105 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
|
Chris@102
|
106 template <class Rep, class Period>
|
Chris@102
|
107 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
|
Chris@102
|
108
|
Chris@102
|
109 template <class Duration>
|
Chris@102
|
110 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
|
Chris@102
|
111 template <class Rep, class Period>
|
Chris@102
|
112 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
|
Chris@102
|
113
|
Chris@102
|
114 template <class Duration>
|
Chris@102
|
115 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
|
Chris@102
|
116 template <class Rep, class Period>
|
Chris@102
|
117 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
|
Chris@102
|
118
|
Chris@102
|
119 template <class Duration>
|
Chris@102
|
120 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
|
Chris@102
|
121 template <class Rep, class Period>
|
Chris@102
|
122 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
|
Chris@102
|
123
|
Chris@102
|
124 private:
|
Chris@102
|
125 T pull(unique_lock<mutex>&);
|
Chris@102
|
126 T pull(lock_guard<mutex>&);
|
Chris@102
|
127
|
Chris@102
|
128 void pull(unique_lock<mutex>&, T& elem);
|
Chris@102
|
129 void pull(lock_guard<mutex>&, T& elem);
|
Chris@102
|
130
|
Chris@102
|
131 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
|
Chris@102
|
132 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
|
Chris@102
|
133
|
Chris@102
|
134 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
|
Chris@102
|
135
|
Chris@102
|
136 bool wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>&);
|
Chris@102
|
137 T pull_when_time_reached(unique_lock<mutex>&);
|
Chris@102
|
138 template <class Duration>
|
Chris@102
|
139 queue_op_status pull_when_time_reached_until(unique_lock<mutex>&, chrono::time_point<clock,Duration> const& tp, T& elem);
|
Chris@102
|
140 bool time_not_reached(unique_lock<mutex>&);
|
Chris@102
|
141 bool time_not_reached(lock_guard<mutex>&);
|
Chris@102
|
142 bool empty_or_time_not_reached(unique_lock<mutex>&);
|
Chris@102
|
143 bool empty_or_time_not_reached(lock_guard<mutex>&);
|
Chris@102
|
144
|
Chris@102
|
145 sync_timed_queue(const sync_timed_queue&);
|
Chris@102
|
146 sync_timed_queue& operator=(const sync_timed_queue&);
|
Chris@102
|
147 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
|
Chris@102
|
148 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
|
Chris@102
|
149 }; //end class
|
Chris@102
|
150
|
Chris@102
|
151
|
Chris@102
|
152 template <class T, class Clock>
|
Chris@102
|
153 template <class Duration>
|
Chris@102
|
154 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
|
Chris@102
|
155 {
|
Chris@102
|
156 super::push(stype(elem,tp));
|
Chris@102
|
157 }
|
Chris@102
|
158
|
Chris@102
|
159 template <class T, class Clock>
|
Chris@102
|
160 template <class Rep, class Period>
|
Chris@102
|
161 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
|
Chris@102
|
162 {
|
Chris@102
|
163 push(elem, clock::now() + dura);
|
Chris@102
|
164 }
|
Chris@102
|
165
|
Chris@102
|
166 template <class T, class Clock>
|
Chris@102
|
167 template <class Duration>
|
Chris@102
|
168 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
|
Chris@102
|
169 {
|
Chris@102
|
170 super::push(stype(boost::move(elem),tp));
|
Chris@102
|
171 }
|
Chris@102
|
172
|
Chris@102
|
173 template <class T, class Clock>
|
Chris@102
|
174 template <class Rep, class Period>
|
Chris@102
|
175 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
|
Chris@102
|
176 {
|
Chris@102
|
177 push(boost::move(elem), clock::now() + dura);
|
Chris@102
|
178 }
|
Chris@102
|
179
|
Chris@102
|
180
|
Chris@102
|
181
|
Chris@102
|
182 template <class T, class Clock>
|
Chris@102
|
183 template <class Duration>
|
Chris@102
|
184 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
|
Chris@102
|
185 {
|
Chris@102
|
186 return super::try_push(stype(elem,tp));
|
Chris@102
|
187 }
|
Chris@102
|
188
|
Chris@102
|
189 template <class T, class Clock>
|
Chris@102
|
190 template <class Rep, class Period>
|
Chris@102
|
191 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
|
Chris@102
|
192 {
|
Chris@102
|
193 return try_push(elem,clock::now() + dura);
|
Chris@102
|
194 }
|
Chris@102
|
195
|
Chris@102
|
196 template <class T, class Clock>
|
Chris@102
|
197 template <class Duration>
|
Chris@102
|
198 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
|
Chris@102
|
199 {
|
Chris@102
|
200 return super::try_push(stype(boost::move(elem), tp));
|
Chris@102
|
201 }
|
Chris@102
|
202
|
Chris@102
|
203 template <class T, class Clock>
|
Chris@102
|
204 template <class Rep, class Period>
|
Chris@102
|
205 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
|
Chris@102
|
206 {
|
Chris@102
|
207 return try_push(boost::move(elem), clock::now() + dura);
|
Chris@102
|
208 }
|
Chris@102
|
209
|
Chris@102
|
210 ///////////////////////////
|
Chris@102
|
211 template <class T, class Clock>
|
Chris@102
|
212 bool sync_timed_queue<T, Clock>::time_not_reached(unique_lock<mutex>&)
|
Chris@102
|
213 {
|
Chris@102
|
214 return super::data_.top().time_not_reached();
|
Chris@102
|
215 }
|
Chris@102
|
216
|
Chris@102
|
217 template <class T, class Clock>
|
Chris@102
|
218 bool sync_timed_queue<T, Clock>::time_not_reached(lock_guard<mutex>&)
|
Chris@102
|
219 {
|
Chris@102
|
220 return super::data_.top().time_not_reached();
|
Chris@102
|
221 }
|
Chris@102
|
222
|
Chris@102
|
223 ///////////////////////////
|
Chris@102
|
224 template <class T, class Clock>
|
Chris@102
|
225 bool sync_timed_queue<T, Clock>::wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>& lk)
|
Chris@102
|
226 {
|
Chris@102
|
227 for (;;)
|
Chris@102
|
228 {
|
Chris@102
|
229 if (super::closed(lk)) return true;
|
Chris@102
|
230 while (! super::empty(lk)) {
|
Chris@102
|
231 if (! time_not_reached(lk)) return false;
|
Chris@102
|
232 super::not_empty_.wait_until(lk, super::data_.top().time);
|
Chris@102
|
233 if (super::closed(lk)) return true;
|
Chris@102
|
234 }
|
Chris@102
|
235 if (super::closed(lk)) return true;
|
Chris@102
|
236 super::not_empty_.wait(lk);
|
Chris@102
|
237 }
|
Chris@102
|
238 return false;
|
Chris@102
|
239 }
|
Chris@102
|
240
|
Chris@102
|
241 ///////////////////////////
|
Chris@102
|
242 template <class T, class Clock>
|
Chris@102
|
243 T sync_timed_queue<T, Clock>::pull_when_time_reached(unique_lock<mutex>& lk)
|
Chris@102
|
244 {
|
Chris@102
|
245 while (time_not_reached(lk))
|
Chris@102
|
246 {
|
Chris@102
|
247 super::throw_if_closed(lk);
|
Chris@102
|
248 super::not_empty_.wait_until(lk,super::data_.top().time);
|
Chris@102
|
249 super::wait_until_not_empty(lk);
|
Chris@102
|
250 }
|
Chris@102
|
251 return pull(lk);
|
Chris@102
|
252 }
|
Chris@102
|
253
|
Chris@102
|
254 template <class T, class Clock>
|
Chris@102
|
255 template <class Duration>
|
Chris@102
|
256 queue_op_status
|
Chris@102
|
257 sync_timed_queue<T, Clock>::pull_when_time_reached_until(unique_lock<mutex>& lk, chrono::time_point<clock,Duration> const& tp, T& elem)
|
Chris@102
|
258 {
|
Chris@102
|
259 chrono::time_point<clock,Duration> tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time;
|
Chris@102
|
260 while (time_not_reached(lk))
|
Chris@102
|
261 {
|
Chris@102
|
262 super::throw_if_closed(lk);
|
Chris@102
|
263 if (queue_op_status::timeout == super::not_empty_.wait_until(lk, tpmin)) {
|
Chris@102
|
264 if (time_not_reached(lk)) return queue_op_status::not_ready;
|
Chris@102
|
265 return queue_op_status::timeout;
|
Chris@102
|
266 }
|
Chris@102
|
267 }
|
Chris@102
|
268 pull(lk, elem);
|
Chris@102
|
269 return queue_op_status::success;
|
Chris@102
|
270 }
|
Chris@102
|
271
|
Chris@102
|
272 ///////////////////////////
|
Chris@102
|
273 template <class T, class Clock>
|
Chris@102
|
274 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(unique_lock<mutex>& lk)
|
Chris@102
|
275 {
|
Chris@102
|
276 if ( super::empty(lk) ) return true;
|
Chris@102
|
277 if ( time_not_reached(lk) ) return true;
|
Chris@102
|
278 return false;
|
Chris@102
|
279 }
|
Chris@102
|
280 template <class T, class Clock>
|
Chris@102
|
281 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(lock_guard<mutex>& lk)
|
Chris@102
|
282 {
|
Chris@102
|
283 if ( super::empty(lk) ) return true;
|
Chris@102
|
284 if ( time_not_reached(lk) ) return true;
|
Chris@102
|
285 return false;
|
Chris@102
|
286 }
|
Chris@102
|
287
|
Chris@102
|
288 ///////////////////////////
|
Chris@102
|
289 template <class T, class Clock>
|
Chris@102
|
290 T sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&)
|
Chris@102
|
291 {
|
Chris@102
|
292 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
|
Chris@102
|
293 return boost::move(super::data_.pull().data);
|
Chris@102
|
294 #else
|
Chris@102
|
295 return super::data_.pull().data;
|
Chris@102
|
296 #endif
|
Chris@102
|
297 }
|
Chris@102
|
298
|
Chris@102
|
299 template <class T, class Clock>
|
Chris@102
|
300 T sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&)
|
Chris@102
|
301 {
|
Chris@102
|
302 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
|
Chris@102
|
303 return boost::move(super::data_.pull().data);
|
Chris@102
|
304 #else
|
Chris@102
|
305 return super::data_.pull().data;
|
Chris@102
|
306 #endif
|
Chris@102
|
307 }
|
Chris@102
|
308 template <class T, class Clock>
|
Chris@102
|
309 T sync_timed_queue<T, Clock>::pull()
|
Chris@102
|
310 {
|
Chris@102
|
311 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
312 super::wait_until_not_empty(lk);
|
Chris@102
|
313 return pull_when_time_reached(lk);
|
Chris@102
|
314 }
|
Chris@102
|
315
|
Chris@102
|
316 ///////////////////////////
|
Chris@102
|
317 template <class T, class Clock>
|
Chris@102
|
318 void sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&, T& elem)
|
Chris@102
|
319 {
|
Chris@102
|
320 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
|
Chris@102
|
321 elem = boost::move(super::data_.pull().data);
|
Chris@102
|
322 #else
|
Chris@102
|
323 elem = super::data_.pull().data;
|
Chris@102
|
324 #endif
|
Chris@102
|
325 }
|
Chris@102
|
326
|
Chris@102
|
327 template <class T, class Clock>
|
Chris@102
|
328 void sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&, T& elem)
|
Chris@102
|
329 {
|
Chris@102
|
330 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
|
Chris@102
|
331 elem = boost::move(super::data_.pull().data);
|
Chris@102
|
332 #else
|
Chris@102
|
333 elem = super::data_.pull().data;
|
Chris@102
|
334 #endif
|
Chris@102
|
335 }
|
Chris@102
|
336
|
Chris@102
|
337 template <class T, class Clock>
|
Chris@102
|
338 void sync_timed_queue<T, Clock>::pull(T& elem)
|
Chris@102
|
339 {
|
Chris@102
|
340 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
341 super::wait_until_not_empty(lk);
|
Chris@102
|
342 elem = pull_when_time_reached(lk);
|
Chris@102
|
343 }
|
Chris@102
|
344
|
Chris@102
|
345 //////////////////////
|
Chris@102
|
346 template <class T, class Clock>
|
Chris@102
|
347 template <class Duration>
|
Chris@102
|
348 queue_op_status
|
Chris@102
|
349 sync_timed_queue<T, Clock>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
|
Chris@102
|
350 {
|
Chris@102
|
351 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
352
|
Chris@102
|
353 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
|
Chris@102
|
354 return queue_op_status::timeout;
|
Chris@102
|
355 return pull_when_time_reached_until(lk, tp, elem);
|
Chris@102
|
356 }
|
Chris@102
|
357
|
Chris@102
|
358 //////////////////////
|
Chris@102
|
359 template <class T, class Clock>
|
Chris@102
|
360 template <class Rep, class Period>
|
Chris@102
|
361 queue_op_status
|
Chris@102
|
362 sync_timed_queue<T, Clock>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
|
Chris@102
|
363 {
|
Chris@102
|
364 return pull_until(clock::now() + dura, elem);
|
Chris@102
|
365 }
|
Chris@102
|
366
|
Chris@102
|
367 ///////////////////////////
|
Chris@102
|
368 template <class T, class Clock>
|
Chris@102
|
369 queue_op_status sync_timed_queue<T, Clock>::try_pull(unique_lock<mutex>& lk, T& elem)
|
Chris@102
|
370 {
|
Chris@102
|
371 if ( super::empty(lk) )
|
Chris@102
|
372 {
|
Chris@102
|
373 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
374 return queue_op_status::empty;
|
Chris@102
|
375 }
|
Chris@102
|
376 if ( time_not_reached(lk) )
|
Chris@102
|
377 {
|
Chris@102
|
378 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
379 return queue_op_status::not_ready;
|
Chris@102
|
380 }
|
Chris@102
|
381
|
Chris@102
|
382 pull(lk, elem);
|
Chris@102
|
383 return queue_op_status::success;
|
Chris@102
|
384 }
|
Chris@102
|
385 template <class T, class Clock>
|
Chris@102
|
386 queue_op_status sync_timed_queue<T, Clock>::try_pull(lock_guard<mutex>& lk, T& elem)
|
Chris@102
|
387 {
|
Chris@102
|
388 if ( super::empty(lk) )
|
Chris@102
|
389 {
|
Chris@102
|
390 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
391 return queue_op_status::empty;
|
Chris@102
|
392 }
|
Chris@102
|
393 if ( time_not_reached(lk) )
|
Chris@102
|
394 {
|
Chris@102
|
395 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
396 return queue_op_status::not_ready;
|
Chris@102
|
397 }
|
Chris@102
|
398 pull(lk, elem);
|
Chris@102
|
399 return queue_op_status::success;
|
Chris@102
|
400 }
|
Chris@102
|
401
|
Chris@102
|
402 template <class T, class Clock>
|
Chris@102
|
403 queue_op_status sync_timed_queue<T, Clock>::try_pull(T& elem)
|
Chris@102
|
404 {
|
Chris@102
|
405 lock_guard<mutex> lk(super::mtx_);
|
Chris@102
|
406 return try_pull(lk, elem);
|
Chris@102
|
407 }
|
Chris@102
|
408
|
Chris@102
|
409 ///////////////////////////
|
Chris@102
|
410 template <class T, class Clock>
|
Chris@102
|
411 queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex>& lk, T& elem)
|
Chris@102
|
412 {
|
Chris@102
|
413 if (super::empty(lk))
|
Chris@102
|
414 {
|
Chris@102
|
415 if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
416 }
|
Chris@102
|
417 bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
|
Chris@102
|
418 if (has_been_closed) return queue_op_status::closed;
|
Chris@102
|
419 pull(lk, elem);
|
Chris@102
|
420 return queue_op_status::success;
|
Chris@102
|
421 }
|
Chris@102
|
422
|
Chris@102
|
423 template <class T, class Clock>
|
Chris@102
|
424 queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
|
Chris@102
|
425 {
|
Chris@102
|
426 unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
427 return wait_pull(lk, elem);
|
Chris@102
|
428 }
|
Chris@102
|
429
|
Chris@102
|
430 // ///////////////////////////
|
Chris@102
|
431 // template <class T, class Clock>
|
Chris@102
|
432 // queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex> &lk, T& elem)
|
Chris@102
|
433 // {
|
Chris@102
|
434 // if (super::empty(lk))
|
Chris@102
|
435 // {
|
Chris@102
|
436 // if (super::closed(lk)) return queue_op_status::closed;
|
Chris@102
|
437 // }
|
Chris@102
|
438 // bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
|
Chris@102
|
439 // if (has_been_closed) return queue_op_status::closed;
|
Chris@102
|
440 // pull(lk, elem);
|
Chris@102
|
441 // return queue_op_status::success;
|
Chris@102
|
442 // }
|
Chris@102
|
443 // template <class T>
|
Chris@102
|
444 // queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
|
Chris@102
|
445 // {
|
Chris@102
|
446 // unique_lock<mutex> lk(super::mtx_);
|
Chris@102
|
447 // return wait_pull(lk, elem);
|
Chris@102
|
448 // }
|
Chris@102
|
449
|
Chris@102
|
450 ///////////////////////////
|
Chris@102
|
451 template <class T, class Clock>
|
Chris@102
|
452 queue_op_status sync_timed_queue<T, Clock>::nonblocking_pull(T& elem)
|
Chris@102
|
453 {
|
Chris@102
|
454 unique_lock<mutex> lk(super::mtx_, try_to_lock);
|
Chris@102
|
455 if (! lk.owns_lock()) return queue_op_status::busy;
|
Chris@102
|
456 return try_pull(lk, elem);
|
Chris@102
|
457 }
|
Chris@102
|
458
|
Chris@102
|
459 } //end concurrent namespace
|
Chris@102
|
460
|
Chris@102
|
461 using concurrent::sync_timed_queue;
|
Chris@102
|
462
|
Chris@102
|
463 } //end boost namespace
|
Chris@102
|
464 #include <boost/config/abi_suffix.hpp>
|
Chris@102
|
465
|
Chris@102
|
466 #endif
|