annotate DEPENDENCIES/generic/include/boost/thread/concurrent_queues/sync_timed_queue.hpp @ 125:34e428693f5d vext

Vext -> Repoint
author Chris Cannam
date Thu, 14 Jun 2018 11:15:39 +0100
parents f46d142149f5
children
rev   line source
Chris@102 1 // Copyright (C) 2014 Ian Forbed
Chris@102 2 // Copyright (C) 2014 Vicente J. Botet Escriba
Chris@102 3 //
Chris@102 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
Chris@102 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Chris@102 6 //
Chris@102 7
Chris@102 8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
Chris@102 9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
Chris@102 10
Chris@102 11 #include <boost/thread/detail/config.hpp>
Chris@102 12
Chris@102 13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
Chris@102 14 #include <boost/chrono/duration.hpp>
Chris@102 15 #include <boost/chrono/time_point.hpp>
Chris@102 16 #include <boost/chrono/system_clocks.hpp>
Chris@102 17 #include <boost/chrono/chrono_io.hpp>
Chris@102 18
Chris@102 19 #include <boost/config/abi_prefix.hpp>
Chris@102 20
Chris@102 21 namespace boost
Chris@102 22 {
Chris@102 23 namespace concurrent
Chris@102 24 {
Chris@102 25 namespace detail
Chris@102 26 {
Chris@102 27 template <class T, class Clock = chrono::steady_clock>
Chris@102 28 struct scheduled_type
Chris@102 29 {
Chris@102 30 typedef T value_type;
Chris@102 31 typedef Clock clock;
Chris@102 32 typedef typename clock::time_point time_point;
Chris@102 33 T data;
Chris@102 34 time_point time;
Chris@102 35
Chris@102 36 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
Chris@102 37
Chris@102 38 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
Chris@102 39 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
Chris@102 40
Chris@102 41 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
Chris@102 42 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
Chris@102 43 data = other.data;
Chris@102 44 time = other.time;
Chris@102 45 return *this;
Chris@102 46 }
Chris@102 47
Chris@102 48 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
Chris@102 49 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
Chris@102 50 data = boost::move(other.data);
Chris@102 51 time = other.time;
Chris@102 52 return *this;
Chris@102 53 }
Chris@102 54
Chris@102 55 bool time_not_reached() const
Chris@102 56 {
Chris@102 57 return time > clock::now();
Chris@102 58 }
Chris@102 59
Chris@102 60 bool operator <(const scheduled_type<T> other) const
Chris@102 61 {
Chris@102 62 return this->time > other.time;
Chris@102 63 }
Chris@102 64 }; //end struct
Chris@102 65
Chris@102 66 } //end detail namespace
Chris@102 67
Chris@102 68 template <class T, class Clock = chrono::steady_clock>
Chris@102 69 class sync_timed_queue
Chris@102 70 : private sync_priority_queue<detail::scheduled_type<T, Clock> >
Chris@102 71 {
Chris@102 72 typedef detail::scheduled_type<T> stype;
Chris@102 73 typedef sync_priority_queue<stype> super;
Chris@102 74 public:
Chris@102 75 typedef T value_type;
Chris@102 76 typedef Clock clock;
Chris@102 77 typedef typename clock::duration duration;
Chris@102 78 typedef typename clock::time_point time_point;
Chris@102 79 typedef typename super::underlying_queue_type underlying_queue_type;
Chris@102 80 typedef typename super::size_type size_type;
Chris@102 81 typedef typename super::op_status op_status;
Chris@102 82
Chris@102 83 sync_timed_queue() : super() {};
Chris@102 84 ~sync_timed_queue() {}
Chris@102 85
Chris@102 86 using super::size;
Chris@102 87 using super::empty;
Chris@102 88 using super::full;
Chris@102 89 using super::close;
Chris@102 90 using super::closed;
Chris@102 91
Chris@102 92 T pull();
Chris@102 93 void pull(T& elem);
Chris@102 94
Chris@102 95 template <class Duration>
Chris@102 96 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
Chris@102 97 template <class Rep, class Period>
Chris@102 98 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
Chris@102 99
Chris@102 100 queue_op_status try_pull(T& elem);
Chris@102 101 queue_op_status wait_pull(T& elem);
Chris@102 102 queue_op_status nonblocking_pull(T& elem);
Chris@102 103
Chris@102 104 template <class Duration>
Chris@102 105 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
Chris@102 106 template <class Rep, class Period>
Chris@102 107 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
Chris@102 108
Chris@102 109 template <class Duration>
Chris@102 110 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
Chris@102 111 template <class Rep, class Period>
Chris@102 112 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
Chris@102 113
Chris@102 114 template <class Duration>
Chris@102 115 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
Chris@102 116 template <class Rep, class Period>
Chris@102 117 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
Chris@102 118
Chris@102 119 template <class Duration>
Chris@102 120 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
Chris@102 121 template <class Rep, class Period>
Chris@102 122 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
Chris@102 123
Chris@102 124 private:
Chris@102 125 T pull(unique_lock<mutex>&);
Chris@102 126 T pull(lock_guard<mutex>&);
Chris@102 127
Chris@102 128 void pull(unique_lock<mutex>&, T& elem);
Chris@102 129 void pull(lock_guard<mutex>&, T& elem);
Chris@102 130
Chris@102 131 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
Chris@102 132 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
Chris@102 133
Chris@102 134 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
Chris@102 135
Chris@102 136 bool wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>&);
Chris@102 137 T pull_when_time_reached(unique_lock<mutex>&);
Chris@102 138 template <class Duration>
Chris@102 139 queue_op_status pull_when_time_reached_until(unique_lock<mutex>&, chrono::time_point<clock,Duration> const& tp, T& elem);
Chris@102 140 bool time_not_reached(unique_lock<mutex>&);
Chris@102 141 bool time_not_reached(lock_guard<mutex>&);
Chris@102 142 bool empty_or_time_not_reached(unique_lock<mutex>&);
Chris@102 143 bool empty_or_time_not_reached(lock_guard<mutex>&);
Chris@102 144
Chris@102 145 sync_timed_queue(const sync_timed_queue&);
Chris@102 146 sync_timed_queue& operator=(const sync_timed_queue&);
Chris@102 147 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
Chris@102 148 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
Chris@102 149 }; //end class
Chris@102 150
Chris@102 151
Chris@102 152 template <class T, class Clock>
Chris@102 153 template <class Duration>
Chris@102 154 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
Chris@102 155 {
Chris@102 156 super::push(stype(elem,tp));
Chris@102 157 }
Chris@102 158
Chris@102 159 template <class T, class Clock>
Chris@102 160 template <class Rep, class Period>
Chris@102 161 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
Chris@102 162 {
Chris@102 163 push(elem, clock::now() + dura);
Chris@102 164 }
Chris@102 165
Chris@102 166 template <class T, class Clock>
Chris@102 167 template <class Duration>
Chris@102 168 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
Chris@102 169 {
Chris@102 170 super::push(stype(boost::move(elem),tp));
Chris@102 171 }
Chris@102 172
Chris@102 173 template <class T, class Clock>
Chris@102 174 template <class Rep, class Period>
Chris@102 175 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
Chris@102 176 {
Chris@102 177 push(boost::move(elem), clock::now() + dura);
Chris@102 178 }
Chris@102 179
Chris@102 180
Chris@102 181
Chris@102 182 template <class T, class Clock>
Chris@102 183 template <class Duration>
Chris@102 184 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
Chris@102 185 {
Chris@102 186 return super::try_push(stype(elem,tp));
Chris@102 187 }
Chris@102 188
Chris@102 189 template <class T, class Clock>
Chris@102 190 template <class Rep, class Period>
Chris@102 191 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
Chris@102 192 {
Chris@102 193 return try_push(elem,clock::now() + dura);
Chris@102 194 }
Chris@102 195
Chris@102 196 template <class T, class Clock>
Chris@102 197 template <class Duration>
Chris@102 198 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
Chris@102 199 {
Chris@102 200 return super::try_push(stype(boost::move(elem), tp));
Chris@102 201 }
Chris@102 202
Chris@102 203 template <class T, class Clock>
Chris@102 204 template <class Rep, class Period>
Chris@102 205 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
Chris@102 206 {
Chris@102 207 return try_push(boost::move(elem), clock::now() + dura);
Chris@102 208 }
Chris@102 209
Chris@102 210 ///////////////////////////
Chris@102 211 template <class T, class Clock>
Chris@102 212 bool sync_timed_queue<T, Clock>::time_not_reached(unique_lock<mutex>&)
Chris@102 213 {
Chris@102 214 return super::data_.top().time_not_reached();
Chris@102 215 }
Chris@102 216
Chris@102 217 template <class T, class Clock>
Chris@102 218 bool sync_timed_queue<T, Clock>::time_not_reached(lock_guard<mutex>&)
Chris@102 219 {
Chris@102 220 return super::data_.top().time_not_reached();
Chris@102 221 }
Chris@102 222
Chris@102 223 ///////////////////////////
Chris@102 224 template <class T, class Clock>
Chris@102 225 bool sync_timed_queue<T, Clock>::wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>& lk)
Chris@102 226 {
Chris@102 227 for (;;)
Chris@102 228 {
Chris@102 229 if (super::closed(lk)) return true;
Chris@102 230 while (! super::empty(lk)) {
Chris@102 231 if (! time_not_reached(lk)) return false;
Chris@102 232 super::not_empty_.wait_until(lk, super::data_.top().time);
Chris@102 233 if (super::closed(lk)) return true;
Chris@102 234 }
Chris@102 235 if (super::closed(lk)) return true;
Chris@102 236 super::not_empty_.wait(lk);
Chris@102 237 }
Chris@102 238 return false;
Chris@102 239 }
Chris@102 240
Chris@102 241 ///////////////////////////
Chris@102 242 template <class T, class Clock>
Chris@102 243 T sync_timed_queue<T, Clock>::pull_when_time_reached(unique_lock<mutex>& lk)
Chris@102 244 {
Chris@102 245 while (time_not_reached(lk))
Chris@102 246 {
Chris@102 247 super::throw_if_closed(lk);
Chris@102 248 super::not_empty_.wait_until(lk,super::data_.top().time);
Chris@102 249 super::wait_until_not_empty(lk);
Chris@102 250 }
Chris@102 251 return pull(lk);
Chris@102 252 }
Chris@102 253
Chris@102 254 template <class T, class Clock>
Chris@102 255 template <class Duration>
Chris@102 256 queue_op_status
Chris@102 257 sync_timed_queue<T, Clock>::pull_when_time_reached_until(unique_lock<mutex>& lk, chrono::time_point<clock,Duration> const& tp, T& elem)
Chris@102 258 {
Chris@102 259 chrono::time_point<clock,Duration> tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time;
Chris@102 260 while (time_not_reached(lk))
Chris@102 261 {
Chris@102 262 super::throw_if_closed(lk);
Chris@102 263 if (queue_op_status::timeout == super::not_empty_.wait_until(lk, tpmin)) {
Chris@102 264 if (time_not_reached(lk)) return queue_op_status::not_ready;
Chris@102 265 return queue_op_status::timeout;
Chris@102 266 }
Chris@102 267 }
Chris@102 268 pull(lk, elem);
Chris@102 269 return queue_op_status::success;
Chris@102 270 }
Chris@102 271
Chris@102 272 ///////////////////////////
Chris@102 273 template <class T, class Clock>
Chris@102 274 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(unique_lock<mutex>& lk)
Chris@102 275 {
Chris@102 276 if ( super::empty(lk) ) return true;
Chris@102 277 if ( time_not_reached(lk) ) return true;
Chris@102 278 return false;
Chris@102 279 }
Chris@102 280 template <class T, class Clock>
Chris@102 281 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(lock_guard<mutex>& lk)
Chris@102 282 {
Chris@102 283 if ( super::empty(lk) ) return true;
Chris@102 284 if ( time_not_reached(lk) ) return true;
Chris@102 285 return false;
Chris@102 286 }
Chris@102 287
Chris@102 288 ///////////////////////////
Chris@102 289 template <class T, class Clock>
Chris@102 290 T sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&)
Chris@102 291 {
Chris@102 292 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
Chris@102 293 return boost::move(super::data_.pull().data);
Chris@102 294 #else
Chris@102 295 return super::data_.pull().data;
Chris@102 296 #endif
Chris@102 297 }
Chris@102 298
Chris@102 299 template <class T, class Clock>
Chris@102 300 T sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&)
Chris@102 301 {
Chris@102 302 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
Chris@102 303 return boost::move(super::data_.pull().data);
Chris@102 304 #else
Chris@102 305 return super::data_.pull().data;
Chris@102 306 #endif
Chris@102 307 }
Chris@102 308 template <class T, class Clock>
Chris@102 309 T sync_timed_queue<T, Clock>::pull()
Chris@102 310 {
Chris@102 311 unique_lock<mutex> lk(super::mtx_);
Chris@102 312 super::wait_until_not_empty(lk);
Chris@102 313 return pull_when_time_reached(lk);
Chris@102 314 }
Chris@102 315
Chris@102 316 ///////////////////////////
Chris@102 317 template <class T, class Clock>
Chris@102 318 void sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&, T& elem)
Chris@102 319 {
Chris@102 320 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
Chris@102 321 elem = boost::move(super::data_.pull().data);
Chris@102 322 #else
Chris@102 323 elem = super::data_.pull().data;
Chris@102 324 #endif
Chris@102 325 }
Chris@102 326
Chris@102 327 template <class T, class Clock>
Chris@102 328 void sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&, T& elem)
Chris@102 329 {
Chris@102 330 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
Chris@102 331 elem = boost::move(super::data_.pull().data);
Chris@102 332 #else
Chris@102 333 elem = super::data_.pull().data;
Chris@102 334 #endif
Chris@102 335 }
Chris@102 336
Chris@102 337 template <class T, class Clock>
Chris@102 338 void sync_timed_queue<T, Clock>::pull(T& elem)
Chris@102 339 {
Chris@102 340 unique_lock<mutex> lk(super::mtx_);
Chris@102 341 super::wait_until_not_empty(lk);
Chris@102 342 elem = pull_when_time_reached(lk);
Chris@102 343 }
Chris@102 344
Chris@102 345 //////////////////////
Chris@102 346 template <class T, class Clock>
Chris@102 347 template <class Duration>
Chris@102 348 queue_op_status
Chris@102 349 sync_timed_queue<T, Clock>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
Chris@102 350 {
Chris@102 351 unique_lock<mutex> lk(super::mtx_);
Chris@102 352
Chris@102 353 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
Chris@102 354 return queue_op_status::timeout;
Chris@102 355 return pull_when_time_reached_until(lk, tp, elem);
Chris@102 356 }
Chris@102 357
Chris@102 358 //////////////////////
Chris@102 359 template <class T, class Clock>
Chris@102 360 template <class Rep, class Period>
Chris@102 361 queue_op_status
Chris@102 362 sync_timed_queue<T, Clock>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
Chris@102 363 {
Chris@102 364 return pull_until(clock::now() + dura, elem);
Chris@102 365 }
Chris@102 366
Chris@102 367 ///////////////////////////
Chris@102 368 template <class T, class Clock>
Chris@102 369 queue_op_status sync_timed_queue<T, Clock>::try_pull(unique_lock<mutex>& lk, T& elem)
Chris@102 370 {
Chris@102 371 if ( super::empty(lk) )
Chris@102 372 {
Chris@102 373 if (super::closed(lk)) return queue_op_status::closed;
Chris@102 374 return queue_op_status::empty;
Chris@102 375 }
Chris@102 376 if ( time_not_reached(lk) )
Chris@102 377 {
Chris@102 378 if (super::closed(lk)) return queue_op_status::closed;
Chris@102 379 return queue_op_status::not_ready;
Chris@102 380 }
Chris@102 381
Chris@102 382 pull(lk, elem);
Chris@102 383 return queue_op_status::success;
Chris@102 384 }
Chris@102 385 template <class T, class Clock>
Chris@102 386 queue_op_status sync_timed_queue<T, Clock>::try_pull(lock_guard<mutex>& lk, T& elem)
Chris@102 387 {
Chris@102 388 if ( super::empty(lk) )
Chris@102 389 {
Chris@102 390 if (super::closed(lk)) return queue_op_status::closed;
Chris@102 391 return queue_op_status::empty;
Chris@102 392 }
Chris@102 393 if ( time_not_reached(lk) )
Chris@102 394 {
Chris@102 395 if (super::closed(lk)) return queue_op_status::closed;
Chris@102 396 return queue_op_status::not_ready;
Chris@102 397 }
Chris@102 398 pull(lk, elem);
Chris@102 399 return queue_op_status::success;
Chris@102 400 }
Chris@102 401
Chris@102 402 template <class T, class Clock>
Chris@102 403 queue_op_status sync_timed_queue<T, Clock>::try_pull(T& elem)
Chris@102 404 {
Chris@102 405 lock_guard<mutex> lk(super::mtx_);
Chris@102 406 return try_pull(lk, elem);
Chris@102 407 }
Chris@102 408
Chris@102 409 ///////////////////////////
Chris@102 410 template <class T, class Clock>
Chris@102 411 queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex>& lk, T& elem)
Chris@102 412 {
Chris@102 413 if (super::empty(lk))
Chris@102 414 {
Chris@102 415 if (super::closed(lk)) return queue_op_status::closed;
Chris@102 416 }
Chris@102 417 bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
Chris@102 418 if (has_been_closed) return queue_op_status::closed;
Chris@102 419 pull(lk, elem);
Chris@102 420 return queue_op_status::success;
Chris@102 421 }
Chris@102 422
Chris@102 423 template <class T, class Clock>
Chris@102 424 queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
Chris@102 425 {
Chris@102 426 unique_lock<mutex> lk(super::mtx_);
Chris@102 427 return wait_pull(lk, elem);
Chris@102 428 }
Chris@102 429
Chris@102 430 // ///////////////////////////
Chris@102 431 // template <class T, class Clock>
Chris@102 432 // queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex> &lk, T& elem)
Chris@102 433 // {
Chris@102 434 // if (super::empty(lk))
Chris@102 435 // {
Chris@102 436 // if (super::closed(lk)) return queue_op_status::closed;
Chris@102 437 // }
Chris@102 438 // bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
Chris@102 439 // if (has_been_closed) return queue_op_status::closed;
Chris@102 440 // pull(lk, elem);
Chris@102 441 // return queue_op_status::success;
Chris@102 442 // }
Chris@102 443 // template <class T>
Chris@102 444 // queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
Chris@102 445 // {
Chris@102 446 // unique_lock<mutex> lk(super::mtx_);
Chris@102 447 // return wait_pull(lk, elem);
Chris@102 448 // }
Chris@102 449
Chris@102 450 ///////////////////////////
Chris@102 451 template <class T, class Clock>
Chris@102 452 queue_op_status sync_timed_queue<T, Clock>::nonblocking_pull(T& elem)
Chris@102 453 {
Chris@102 454 unique_lock<mutex> lk(super::mtx_, try_to_lock);
Chris@102 455 if (! lk.owns_lock()) return queue_op_status::busy;
Chris@102 456 return try_pull(lk, elem);
Chris@102 457 }
Chris@102 458
Chris@102 459 } //end concurrent namespace
Chris@102 460
Chris@102 461 using concurrent::sync_timed_queue;
Chris@102 462
Chris@102 463 } //end boost namespace
Chris@102 464 #include <boost/config/abi_suffix.hpp>
Chris@102 465
Chris@102 466 #endif