Chris@102: // Copyright (C) 2014 Ian Forbed Chris@102: // Copyright (C) 2014 Vicente J. Botet Escriba Chris@102: // Chris@102: // Distributed under the Boost Software License, Version 1.0. (See accompanying Chris@102: // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@102: // Chris@102: Chris@102: #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP Chris@102: #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP Chris@102: Chris@102: #include Chris@102: Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: Chris@102: #include Chris@102: Chris@102: namespace boost Chris@102: { Chris@102: namespace concurrent Chris@102: { Chris@102: namespace detail Chris@102: { Chris@102: template Chris@102: struct scheduled_type Chris@102: { Chris@102: typedef T value_type; Chris@102: typedef Clock clock; Chris@102: typedef typename clock::time_point time_point; Chris@102: T data; Chris@102: time_point time; Chris@102: Chris@102: BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type) Chris@102: Chris@102: scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {} Chris@102: scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {} Chris@102: Chris@102: scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {} Chris@102: scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) { Chris@102: data = other.data; Chris@102: time = other.time; Chris@102: return *this; Chris@102: } Chris@102: Chris@102: scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {} Chris@102: scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) { Chris@102: data = boost::move(other.data); Chris@102: time = other.time; Chris@102: return *this; Chris@102: } Chris@102: Chris@102: bool time_not_reached() const Chris@102: { Chris@102: return time > clock::now(); Chris@102: } Chris@102: Chris@102: bool operator <(const scheduled_type other) const Chris@102: { Chris@102: return this->time > other.time; Chris@102: } Chris@102: }; //end struct Chris@102: Chris@102: } //end detail namespace Chris@102: Chris@102: template Chris@102: class sync_timed_queue Chris@102: : private sync_priority_queue > Chris@102: { Chris@102: typedef detail::scheduled_type stype; Chris@102: typedef sync_priority_queue super; Chris@102: public: Chris@102: typedef T value_type; Chris@102: typedef Clock clock; Chris@102: typedef typename clock::duration duration; Chris@102: typedef typename clock::time_point time_point; Chris@102: typedef typename super::underlying_queue_type underlying_queue_type; Chris@102: typedef typename super::size_type size_type; Chris@102: typedef typename super::op_status op_status; Chris@102: Chris@102: sync_timed_queue() : super() {}; Chris@102: ~sync_timed_queue() {} Chris@102: Chris@102: using super::size; Chris@102: using super::empty; Chris@102: using super::full; Chris@102: using super::close; Chris@102: using super::closed; Chris@102: Chris@102: T pull(); Chris@102: void pull(T& elem); Chris@102: Chris@102: template Chris@102: queue_op_status pull_until(chrono::time_point const& tp, T& elem); Chris@102: template Chris@102: queue_op_status pull_for(chrono::duration const& dura, T& elem); Chris@102: Chris@102: queue_op_status try_pull(T& elem); Chris@102: queue_op_status wait_pull(T& elem); Chris@102: queue_op_status nonblocking_pull(T& elem); Chris@102: Chris@102: template Chris@102: void push(const T& elem, chrono::time_point const& tp); Chris@102: template Chris@102: void push(const T& elem, chrono::duration const& dura); Chris@102: Chris@102: template Chris@102: void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp); Chris@102: template Chris@102: void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura); Chris@102: Chris@102: template Chris@102: queue_op_status try_push(const T& elem, chrono::time_point const& tp); Chris@102: template Chris@102: queue_op_status try_push(const T& elem, chrono::duration const& dura); Chris@102: Chris@102: template Chris@102: queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp); Chris@102: template Chris@102: queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura); Chris@102: Chris@102: private: Chris@102: T pull(unique_lock&); Chris@102: T pull(lock_guard&); Chris@102: Chris@102: void pull(unique_lock&, T& elem); Chris@102: void pull(lock_guard&, T& elem); Chris@102: Chris@102: queue_op_status try_pull(unique_lock&, T& elem); Chris@102: queue_op_status try_pull(lock_guard&, T& elem); Chris@102: Chris@102: queue_op_status wait_pull(unique_lock& lk, T& elem); Chris@102: Chris@102: bool wait_until_not_empty_time_reached_or_closed(unique_lock&); Chris@102: T pull_when_time_reached(unique_lock&); Chris@102: template Chris@102: queue_op_status pull_when_time_reached_until(unique_lock&, chrono::time_point const& tp, T& elem); Chris@102: bool time_not_reached(unique_lock&); Chris@102: bool time_not_reached(lock_guard&); Chris@102: bool empty_or_time_not_reached(unique_lock&); Chris@102: bool empty_or_time_not_reached(lock_guard&); Chris@102: Chris@102: sync_timed_queue(const sync_timed_queue&); Chris@102: sync_timed_queue& operator=(const sync_timed_queue&); Chris@102: sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue)); Chris@102: sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue)); Chris@102: }; //end class Chris@102: Chris@102: Chris@102: template Chris@102: template Chris@102: void sync_timed_queue::push(const T& elem, chrono::time_point const& tp) Chris@102: { Chris@102: super::push(stype(elem,tp)); Chris@102: } Chris@102: Chris@102: template Chris@102: template Chris@102: void sync_timed_queue::push(const T& elem, chrono::duration const& dura) Chris@102: { Chris@102: push(elem, clock::now() + dura); Chris@102: } Chris@102: Chris@102: template Chris@102: template Chris@102: void sync_timed_queue::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp) Chris@102: { Chris@102: super::push(stype(boost::move(elem),tp)); Chris@102: } Chris@102: Chris@102: template Chris@102: template Chris@102: void sync_timed_queue::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura) Chris@102: { Chris@102: push(boost::move(elem), clock::now() + dura); Chris@102: } Chris@102: Chris@102: Chris@102: Chris@102: template Chris@102: template Chris@102: queue_op_status sync_timed_queue::try_push(const T& elem, chrono::time_point const& tp) Chris@102: { Chris@102: return super::try_push(stype(elem,tp)); Chris@102: } Chris@102: Chris@102: template Chris@102: template Chris@102: queue_op_status sync_timed_queue::try_push(const T& elem, chrono::duration const& dura) Chris@102: { Chris@102: return try_push(elem,clock::now() + dura); Chris@102: } Chris@102: Chris@102: template Chris@102: template Chris@102: queue_op_status sync_timed_queue::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp) Chris@102: { Chris@102: return super::try_push(stype(boost::move(elem), tp)); Chris@102: } Chris@102: Chris@102: template Chris@102: template Chris@102: queue_op_status sync_timed_queue::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura) Chris@102: { Chris@102: return try_push(boost::move(elem), clock::now() + dura); Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: bool sync_timed_queue::time_not_reached(unique_lock&) Chris@102: { Chris@102: return super::data_.top().time_not_reached(); Chris@102: } Chris@102: Chris@102: template Chris@102: bool sync_timed_queue::time_not_reached(lock_guard&) Chris@102: { Chris@102: return super::data_.top().time_not_reached(); Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: bool sync_timed_queue::wait_until_not_empty_time_reached_or_closed(unique_lock& lk) Chris@102: { Chris@102: for (;;) Chris@102: { Chris@102: if (super::closed(lk)) return true; Chris@102: while (! super::empty(lk)) { Chris@102: if (! time_not_reached(lk)) return false; Chris@102: super::not_empty_.wait_until(lk, super::data_.top().time); Chris@102: if (super::closed(lk)) return true; Chris@102: } Chris@102: if (super::closed(lk)) return true; Chris@102: super::not_empty_.wait(lk); Chris@102: } Chris@102: return false; Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: T sync_timed_queue::pull_when_time_reached(unique_lock& lk) Chris@102: { Chris@102: while (time_not_reached(lk)) Chris@102: { Chris@102: super::throw_if_closed(lk); Chris@102: super::not_empty_.wait_until(lk,super::data_.top().time); Chris@102: super::wait_until_not_empty(lk); Chris@102: } Chris@102: return pull(lk); Chris@102: } Chris@102: Chris@102: template Chris@102: template Chris@102: queue_op_status Chris@102: sync_timed_queue::pull_when_time_reached_until(unique_lock& lk, chrono::time_point const& tp, T& elem) Chris@102: { Chris@102: chrono::time_point tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time; Chris@102: while (time_not_reached(lk)) Chris@102: { Chris@102: super::throw_if_closed(lk); Chris@102: if (queue_op_status::timeout == super::not_empty_.wait_until(lk, tpmin)) { Chris@102: if (time_not_reached(lk)) return queue_op_status::not_ready; Chris@102: return queue_op_status::timeout; Chris@102: } Chris@102: } Chris@102: pull(lk, elem); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: bool sync_timed_queue::empty_or_time_not_reached(unique_lock& lk) Chris@102: { Chris@102: if ( super::empty(lk) ) return true; Chris@102: if ( time_not_reached(lk) ) return true; Chris@102: return false; Chris@102: } Chris@102: template Chris@102: bool sync_timed_queue::empty_or_time_not_reached(lock_guard& lk) Chris@102: { Chris@102: if ( super::empty(lk) ) return true; Chris@102: if ( time_not_reached(lk) ) return true; Chris@102: return false; Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: T sync_timed_queue::pull(unique_lock&) Chris@102: { Chris@102: #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES Chris@102: return boost::move(super::data_.pull().data); Chris@102: #else Chris@102: return super::data_.pull().data; Chris@102: #endif Chris@102: } Chris@102: Chris@102: template Chris@102: T sync_timed_queue::pull(lock_guard&) Chris@102: { Chris@102: #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES Chris@102: return boost::move(super::data_.pull().data); Chris@102: #else Chris@102: return super::data_.pull().data; Chris@102: #endif Chris@102: } Chris@102: template Chris@102: T sync_timed_queue::pull() Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: super::wait_until_not_empty(lk); Chris@102: return pull_when_time_reached(lk); Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: void sync_timed_queue::pull(unique_lock&, T& elem) Chris@102: { Chris@102: #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES Chris@102: elem = boost::move(super::data_.pull().data); Chris@102: #else Chris@102: elem = super::data_.pull().data; Chris@102: #endif Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_timed_queue::pull(lock_guard&, T& elem) Chris@102: { Chris@102: #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES Chris@102: elem = boost::move(super::data_.pull().data); Chris@102: #else Chris@102: elem = super::data_.pull().data; Chris@102: #endif Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_timed_queue::pull(T& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: super::wait_until_not_empty(lk); Chris@102: elem = pull_when_time_reached(lk); Chris@102: } Chris@102: Chris@102: ////////////////////// Chris@102: template Chris@102: template Chris@102: queue_op_status Chris@102: sync_timed_queue::pull_until(chrono::time_point const& tp, T& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: Chris@102: if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp)) Chris@102: return queue_op_status::timeout; Chris@102: return pull_when_time_reached_until(lk, tp, elem); Chris@102: } Chris@102: Chris@102: ////////////////////// Chris@102: template Chris@102: template Chris@102: queue_op_status Chris@102: sync_timed_queue::pull_for(chrono::duration const& dura, T& elem) Chris@102: { Chris@102: return pull_until(clock::now() + dura, elem); Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: queue_op_status sync_timed_queue::try_pull(unique_lock& lk, T& elem) Chris@102: { Chris@102: if ( super::empty(lk) ) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: return queue_op_status::empty; Chris@102: } Chris@102: if ( time_not_reached(lk) ) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: return queue_op_status::not_ready; Chris@102: } Chris@102: Chris@102: pull(lk, elem); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: template Chris@102: queue_op_status sync_timed_queue::try_pull(lock_guard& lk, T& elem) Chris@102: { Chris@102: if ( super::empty(lk) ) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: return queue_op_status::empty; Chris@102: } Chris@102: if ( time_not_reached(lk) ) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: return queue_op_status::not_ready; Chris@102: } Chris@102: pull(lk, elem); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_timed_queue::try_pull(T& elem) Chris@102: { Chris@102: lock_guard lk(super::mtx_); Chris@102: return try_pull(lk, elem); Chris@102: } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: queue_op_status sync_timed_queue::wait_pull(unique_lock& lk, T& elem) Chris@102: { Chris@102: if (super::empty(lk)) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: } Chris@102: bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk); Chris@102: if (has_been_closed) return queue_op_status::closed; Chris@102: pull(lk, elem); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_timed_queue::wait_pull(T& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: return wait_pull(lk, elem); Chris@102: } Chris@102: Chris@102: // /////////////////////////// Chris@102: // template Chris@102: // queue_op_status sync_timed_queue::wait_pull(unique_lock &lk, T& elem) Chris@102: // { Chris@102: // if (super::empty(lk)) Chris@102: // { Chris@102: // if (super::closed(lk)) return queue_op_status::closed; Chris@102: // } Chris@102: // bool has_been_closed = super::wait_until_not_empty_or_closed(lk); Chris@102: // if (has_been_closed) return queue_op_status::closed; Chris@102: // pull(lk, elem); Chris@102: // return queue_op_status::success; Chris@102: // } Chris@102: // template Chris@102: // queue_op_status sync_timed_queue::wait_pull(T& elem) Chris@102: // { Chris@102: // unique_lock lk(super::mtx_); Chris@102: // return wait_pull(lk, elem); Chris@102: // } Chris@102: Chris@102: /////////////////////////// Chris@102: template Chris@102: queue_op_status sync_timed_queue::nonblocking_pull(T& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_, try_to_lock); Chris@102: if (! lk.owns_lock()) return queue_op_status::busy; Chris@102: return try_pull(lk, elem); Chris@102: } Chris@102: Chris@102: } //end concurrent namespace Chris@102: Chris@102: using concurrent::sync_timed_queue; Chris@102: Chris@102: } //end boost namespace Chris@102: #include Chris@102: Chris@102: #endif