Chris@102: #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP Chris@102: #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP Chris@102: Chris@102: ////////////////////////////////////////////////////////////////////////////// Chris@102: // Chris@102: // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost Chris@102: // Software License, Version 1.0. (See accompanying file Chris@102: // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@102: // Chris@102: // See http://www.boost.org/libs/thread for documentation. Chris@102: // Chris@102: ////////////////////////////////////////////////////////////////////////////// Chris@102: Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: #include Chris@102: #include Chris@102: #endif Chris@102: #include Chris@102: Chris@102: namespace boost Chris@102: { Chris@102: namespace concurrent Chris@102: { Chris@102: template Chris@102: class sync_bounded_queue Chris@102: { Chris@102: public: Chris@102: typedef ValueType value_type; Chris@102: typedef std::size_t size_type; Chris@102: Chris@102: // Constructors/Assignment/Destructors Chris@102: BOOST_THREAD_NO_COPYABLE(sync_bounded_queue) Chris@102: explicit sync_bounded_queue(size_type max_elems); Chris@102: template Chris@102: sync_bounded_queue(size_type max_elems, Range range); Chris@102: ~sync_bounded_queue(); Chris@102: Chris@102: // Observers Chris@102: inline bool empty() const; Chris@102: inline bool full() const; Chris@102: inline size_type capacity() const; Chris@102: inline size_type size() const; Chris@102: inline bool closed() const; Chris@102: Chris@102: // Modifiers Chris@102: inline void close(); Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: inline void push(const value_type& x); Chris@102: inline void push(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline bool try_push(const value_type& x); Chris@102: inline bool try_push(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline bool try_push(no_block_tag, const value_type& x); Chris@102: inline bool try_push(no_block_tag, BOOST_THREAD_RV_REF(value_type) x); Chris@102: #endif Chris@102: inline void push_back(const value_type& x); Chris@102: inline void push_back(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline queue_op_status try_push_back(const value_type& x); Chris@102: inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline queue_op_status nonblocking_push_back(const value_type& x); Chris@102: inline queue_op_status nonblocking_push_back(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline queue_op_status wait_push_back(const value_type& x); Chris@102: inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x); Chris@102: Chris@102: // Observers/Modifiers Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: inline void pull(value_type&); Chris@102: // enable_if is_nothrow_copy_movable Chris@102: inline value_type pull(); Chris@102: inline shared_ptr ptr_pull(); Chris@102: inline bool try_pull(value_type&); Chris@102: inline bool try_pull(no_block_tag,value_type&); Chris@102: inline shared_ptr try_pull(); Chris@102: #endif Chris@102: inline void pull_front(value_type&); Chris@102: // enable_if is_nothrow_copy_movable Chris@102: inline value_type pull_front(); Chris@102: inline queue_op_status try_pull_front(value_type&); Chris@102: inline queue_op_status nonblocking_pull_front(value_type&); Chris@102: Chris@102: inline queue_op_status wait_pull_front(ValueType& elem); Chris@102: Chris@102: private: Chris@102: mutable mutex mtx_; Chris@102: condition_variable not_empty_; Chris@102: condition_variable not_full_; Chris@102: size_type waiting_full_; Chris@102: size_type waiting_empty_; Chris@102: value_type* data_; Chris@102: size_type in_; Chris@102: size_type out_; Chris@102: size_type capacity_; Chris@102: bool closed_; Chris@102: Chris@102: inline size_type inc(size_type idx) const BOOST_NOEXCEPT Chris@102: { Chris@102: return (idx + 1) % capacity_; Chris@102: } Chris@102: Chris@102: inline bool empty(unique_lock& ) const BOOST_NOEXCEPT Chris@102: { Chris@102: return in_ == out_; Chris@102: } Chris@102: inline bool empty(lock_guard& ) const BOOST_NOEXCEPT Chris@102: { Chris@102: return in_ == out_; Chris@102: } Chris@102: inline bool full(unique_lock& ) const BOOST_NOEXCEPT Chris@102: { Chris@102: return (inc(in_) == out_); Chris@102: } Chris@102: inline bool full(lock_guard& ) const BOOST_NOEXCEPT Chris@102: { Chris@102: return (inc(in_) == out_); Chris@102: } Chris@102: inline size_type capacity(lock_guard& ) const BOOST_NOEXCEPT Chris@102: { Chris@102: return capacity_-1; Chris@102: } Chris@102: inline size_type size(lock_guard& lk) const BOOST_NOEXCEPT Chris@102: { Chris@102: if (full(lk)) return capacity(lk); Chris@102: return ((out_+capacity(lk)-in_) % capacity(lk)); Chris@102: } Chris@102: Chris@102: inline void throw_if_closed(unique_lock&); Chris@102: inline bool closed(unique_lock&) const; Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: inline bool try_pull(value_type& x, unique_lock& lk); Chris@102: inline shared_ptr try_pull(unique_lock& lk); Chris@102: inline bool try_push(const value_type& x, unique_lock& lk); Chris@102: inline bool try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock& lk); Chris@102: #endif Chris@102: inline queue_op_status try_pull_front(value_type& x, unique_lock& lk); Chris@102: inline queue_op_status try_push_back(const value_type& x, unique_lock& lk); Chris@102: inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock& lk); Chris@102: Chris@102: inline queue_op_status wait_pull_front(value_type& x, unique_lock& lk); Chris@102: inline queue_op_status wait_push_back(const value_type& x, unique_lock& lk); Chris@102: inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock& lk); Chris@102: Chris@102: inline void wait_until_not_empty(unique_lock& lk); Chris@102: inline void wait_until_not_empty(unique_lock& lk, bool&); Chris@102: inline size_type wait_until_not_full(unique_lock& lk); Chris@102: inline size_type wait_until_not_full(unique_lock& lk, bool&); Chris@102: Chris@102: Chris@102: inline void notify_not_empty_if_needed(unique_lock& lk) Chris@102: { Chris@102: if (waiting_empty_ > 0) Chris@102: { Chris@102: --waiting_empty_; Chris@102: lk.unlock(); Chris@102: not_empty_.notify_one(); Chris@102: } Chris@102: } Chris@102: inline void notify_not_full_if_needed(unique_lock& lk) Chris@102: { Chris@102: if (waiting_full_ > 0) Chris@102: { Chris@102: --waiting_full_; Chris@102: lk.unlock(); Chris@102: not_full_.notify_one(); Chris@102: } Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: inline void pull(value_type& elem, unique_lock& lk) Chris@102: { Chris@102: elem = boost::move(data_[out_]); Chris@102: out_ = inc(out_); Chris@102: notify_not_full_if_needed(lk); Chris@102: } Chris@102: inline value_type pull(unique_lock& lk) Chris@102: { Chris@102: value_type elem = boost::move(data_[out_]); Chris@102: out_ = inc(out_); Chris@102: notify_not_full_if_needed(lk); Chris@102: return boost::move(elem); Chris@102: } Chris@102: inline boost::shared_ptr ptr_pull(unique_lock& lk) Chris@102: { Chris@102: shared_ptr res = make_shared(boost::move(data_[out_])); Chris@102: out_ = inc(out_); Chris@102: notify_not_full_if_needed(lk); Chris@102: return res; Chris@102: } Chris@102: #endif Chris@102: inline void pull_front(value_type& elem, unique_lock& lk) Chris@102: { Chris@102: elem = boost::move(data_[out_]); Chris@102: out_ = inc(out_); Chris@102: notify_not_full_if_needed(lk); Chris@102: } Chris@102: inline value_type pull_front(unique_lock& lk) Chris@102: { Chris@102: value_type elem = boost::move(data_[out_]); Chris@102: out_ = inc(out_); Chris@102: notify_not_full_if_needed(lk); Chris@102: return boost::move(elem); Chris@102: } Chris@102: Chris@102: inline void set_in(size_type in, unique_lock& lk) Chris@102: { Chris@102: in_ = in; Chris@102: notify_not_empty_if_needed(lk); Chris@102: } Chris@102: Chris@102: inline void push_at(const value_type& elem, size_type in_p_1, unique_lock& lk) Chris@102: { Chris@102: data_[in_] = elem; Chris@102: set_in(in_p_1, lk); Chris@102: } Chris@102: Chris@102: inline void push_at(BOOST_THREAD_RV_REF(value_type) elem, size_type in_p_1, unique_lock& lk) Chris@102: { Chris@102: data_[in_] = boost::move(elem); Chris@102: set_in(in_p_1, lk); Chris@102: } Chris@102: }; Chris@102: Chris@102: template Chris@102: sync_bounded_queue::sync_bounded_queue(typename sync_bounded_queue::size_type max_elems) : Chris@102: waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1), Chris@102: closed_(false) Chris@102: { Chris@102: BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1"); Chris@102: } Chris@102: Chris@102: // template Chris@102: // template Chris@102: // sync_bounded_queue::sync_bounded_queue(size_type max_elems, Range range) : Chris@102: // waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1), Chris@102: // closed_(false) Chris@102: // { Chris@102: // BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1"); Chris@102: // BOOST_ASSERT_MSG(max_elems == size(range), "number of elements must match range's size"); Chris@102: // try Chris@102: // { Chris@102: // typedef typename Range::iterator iterator_t; Chris@102: // iterator_t first = boost::begin(range); Chris@102: // iterator_t end = boost::end(range); Chris@102: // size_type in = 0; Chris@102: // for (iterator_t cur = first; cur != end; ++cur, ++in) Chris@102: // { Chris@102: // data_[in] = *cur; Chris@102: // } Chris@102: // set_in(in); Chris@102: // } Chris@102: // catch (...) Chris@102: // { Chris@102: // delete[] data_; Chris@102: // } Chris@102: // } Chris@102: Chris@102: template Chris@102: sync_bounded_queue::~sync_bounded_queue() Chris@102: { Chris@102: delete[] data_; Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_bounded_queue::close() Chris@102: { Chris@102: { Chris@102: lock_guard lk(mtx_); Chris@102: closed_ = true; Chris@102: } Chris@102: not_empty_.notify_all(); Chris@102: not_full_.notify_all(); Chris@102: } Chris@102: Chris@102: template Chris@102: bool sync_bounded_queue::closed() const Chris@102: { Chris@102: lock_guard lk(mtx_); Chris@102: return closed_; Chris@102: } Chris@102: template Chris@102: bool sync_bounded_queue::closed(unique_lock& ) const Chris@102: { Chris@102: return closed_; Chris@102: } Chris@102: Chris@102: template Chris@102: bool sync_bounded_queue::empty() const Chris@102: { Chris@102: lock_guard lk(mtx_); Chris@102: return empty(lk); Chris@102: } Chris@102: template Chris@102: bool sync_bounded_queue::full() const Chris@102: { Chris@102: lock_guard lk(mtx_); Chris@102: return full(lk); Chris@102: } Chris@102: Chris@102: template Chris@102: typename sync_bounded_queue::size_type sync_bounded_queue::capacity() const Chris@102: { Chris@102: lock_guard lk(mtx_); Chris@102: return capacity(lk); Chris@102: } Chris@102: Chris@102: template Chris@102: typename sync_bounded_queue::size_type sync_bounded_queue::size() const Chris@102: { Chris@102: lock_guard lk(mtx_); Chris@102: return size(lk); Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: bool sync_bounded_queue::try_pull(ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (empty(lk)) Chris@102: { Chris@102: throw_if_closed(lk); Chris@102: return false; Chris@102: } Chris@102: pull(elem, lk); Chris@102: return true; Chris@102: } Chris@102: template Chris@102: shared_ptr sync_bounded_queue::try_pull(unique_lock& lk) Chris@102: { Chris@102: if (empty(lk)) Chris@102: { Chris@102: throw_if_closed(lk); Chris@102: return shared_ptr(); Chris@102: } Chris@102: return ptr_pull(lk); Chris@102: } Chris@102: template Chris@102: bool sync_bounded_queue::try_pull(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_pull(elem, lk); Chris@102: } Chris@102: #endif Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::try_pull_front(ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (empty(lk)) Chris@102: { Chris@102: if (closed(lk)) return queue_op_status::closed; Chris@102: return queue_op_status::empty; Chris@102: } Chris@102: pull_front(elem, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::try_pull_front(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_pull_front(elem, lk); Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: bool sync_bounded_queue::try_pull(no_block_tag,ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) Chris@102: { Chris@102: return false; Chris@102: } Chris@102: return try_pull(elem, lk); Chris@102: } Chris@102: template Chris@102: boost::shared_ptr sync_bounded_queue::try_pull() Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_pull(lk); Chris@102: } Chris@102: #endif Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::nonblocking_pull_front(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) Chris@102: { Chris@102: return queue_op_status::busy; Chris@102: } Chris@102: return try_pull_front(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_bounded_queue::throw_if_closed(unique_lock&) Chris@102: { Chris@102: if (closed_) Chris@102: { Chris@102: BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); Chris@102: } Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_bounded_queue::wait_until_not_empty(unique_lock& lk) Chris@102: { Chris@102: for (;;) Chris@102: { Chris@102: if (out_ != in_) break; Chris@102: throw_if_closed(lk); Chris@102: ++waiting_empty_; Chris@102: not_empty_.wait(lk); Chris@102: } Chris@102: } Chris@102: template Chris@102: void sync_bounded_queue::wait_until_not_empty(unique_lock& lk, bool & closed) Chris@102: { Chris@102: for (;;) Chris@102: { Chris@102: if (out_ != in_) break; Chris@102: if (closed_) {closed=true; return;} Chris@102: ++waiting_empty_; Chris@102: not_empty_.wait(lk); Chris@102: } Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: void sync_bounded_queue::pull(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: wait_until_not_empty(lk); Chris@102: pull(elem, lk); Chris@102: } Chris@102: // template Chris@102: // void sync_bounded_queue::pull(ValueType& elem, bool & closed) Chris@102: // { Chris@102: // unique_lock lk(mtx_); Chris@102: // wait_until_not_empty(lk, closed); Chris@102: // if (closed) {return;} Chris@102: // pull(elem, lk); Chris@102: // } Chris@102: Chris@102: // enable if ValueType is nothrow movable Chris@102: template Chris@102: ValueType sync_bounded_queue::pull() Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: wait_until_not_empty(lk); Chris@102: return pull(lk); Chris@102: } Chris@102: template Chris@102: boost::shared_ptr sync_bounded_queue::ptr_pull() Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: wait_until_not_empty(lk); Chris@102: return ptr_pull(lk); Chris@102: } Chris@102: Chris@102: #endif Chris@102: Chris@102: template Chris@102: void sync_bounded_queue::pull_front(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: wait_until_not_empty(lk); Chris@102: pull_front(elem, lk); Chris@102: } Chris@102: Chris@102: // enable if ValueType is nothrow movable Chris@102: template Chris@102: ValueType sync_bounded_queue::pull_front() Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: wait_until_not_empty(lk); Chris@102: return pull_front(lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::wait_pull_front(ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (empty(lk) && closed(lk)) {return queue_op_status::closed;} Chris@102: wait_until_not_empty(lk); Chris@102: pull_front(elem, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: template Chris@102: queue_op_status sync_bounded_queue::wait_pull_front(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return wait_pull_front(elem, lk); Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: bool sync_bounded_queue::try_push(const ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: throw_if_closed(lk); Chris@102: size_type in_p_1 = inc(in_); Chris@102: if (in_p_1 == out_) // full() Chris@102: { Chris@102: return false; Chris@102: } Chris@102: push_at(elem, in_p_1, lk); Chris@102: return true; Chris@102: } Chris@102: template Chris@102: bool sync_bounded_queue::try_push(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_push(elem, lk); Chris@102: } Chris@102: Chris@102: #endif Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::try_push_back(const ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (closed(lk)) return queue_op_status::closed; Chris@102: size_type in_p_1 = inc(in_); Chris@102: if (in_p_1 == out_) // full() Chris@102: { Chris@102: return queue_op_status::full; Chris@102: } Chris@102: push_at(elem, in_p_1, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::try_push_back(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_push_back(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::wait_push_back(const ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (closed(lk)) return queue_op_status::closed; Chris@102: push_at(elem, wait_until_not_full(lk), lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: template Chris@102: queue_op_status sync_bounded_queue::wait_push_back(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return wait_push_back(elem, lk); Chris@102: } Chris@102: Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: bool sync_bounded_queue::try_push(no_block_tag, const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) return false; Chris@102: return try_push(elem, lk); Chris@102: } Chris@102: #endif Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::nonblocking_push_back(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) return queue_op_status::busy; Chris@102: return try_push_back(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: typename sync_bounded_queue::size_type sync_bounded_queue::wait_until_not_full(unique_lock& lk) Chris@102: { Chris@102: for (;;) Chris@102: { Chris@102: throw_if_closed(lk); Chris@102: size_type in_p_1 = inc(in_); Chris@102: if (in_p_1 != out_) // ! full() Chris@102: { Chris@102: return in_p_1; Chris@102: } Chris@102: ++waiting_full_; Chris@102: not_full_.wait(lk); Chris@102: } Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: void sync_bounded_queue::push(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: push_at(elem, wait_until_not_full(lk), lk); Chris@102: } Chris@102: #endif Chris@102: template Chris@102: void sync_bounded_queue::push_back(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: push_at(elem, wait_until_not_full(lk), lk); Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: bool sync_bounded_queue::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock& lk) Chris@102: { Chris@102: throw_if_closed(lk); Chris@102: size_type in_p_1 = inc(in_); Chris@102: if (in_p_1 == out_) // full() Chris@102: { Chris@102: return false; Chris@102: } Chris@102: push_at(boost::move(elem), in_p_1, lk); Chris@102: return true; Chris@102: } Chris@102: Chris@102: template Chris@102: bool sync_bounded_queue::try_push(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_push(boost::move(elem), lk); Chris@102: } Chris@102: #endif Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock& lk) Chris@102: { Chris@102: if (closed(lk)) return queue_op_status::closed; Chris@102: size_type in_p_1 = inc(in_); Chris@102: if (in_p_1 == out_) // full() Chris@102: { Chris@102: return queue_op_status::full; Chris@102: } Chris@102: push_at(boost::move(elem), in_p_1, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: template Chris@102: queue_op_status sync_bounded_queue::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_push_back(boost::move(elem), lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_bounded_queue::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock& lk) Chris@102: { Chris@102: if (closed(lk)) return queue_op_status::closed; Chris@102: push_at(boost::move(elem), wait_until_not_full(lk), lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: template Chris@102: queue_op_status sync_bounded_queue::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: return try_push_back(boost::move(elem), lk); Chris@102: } Chris@102: Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: bool sync_bounded_queue::try_push(no_block_tag, BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) Chris@102: { Chris@102: return false; Chris@102: } Chris@102: return try_push(boost::move(elem), lk); Chris@102: } Chris@102: #endif Chris@102: template Chris@102: queue_op_status sync_bounded_queue::nonblocking_push_back(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) Chris@102: { Chris@102: return queue_op_status::busy; Chris@102: } Chris@102: return try_push_back(boost::move(elem), lk); Chris@102: } Chris@102: Chris@102: #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD Chris@102: template Chris@102: void sync_bounded_queue::push(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: push_at(boost::move(elem), wait_until_not_full(lk), lk); Chris@102: } Chris@102: #endif Chris@102: template Chris@102: void sync_bounded_queue::push_back(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(mtx_); Chris@102: push_at(boost::move(elem), wait_until_not_full(lk), lk); Chris@102: } Chris@102: Chris@102: template Chris@102: sync_bounded_queue& operator<<(sync_bounded_queue& sbq, BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: sbq.push_back(boost::move(elem)); Chris@102: return sbq; Chris@102: } Chris@102: Chris@102: template Chris@102: sync_bounded_queue& operator<<(sync_bounded_queue& sbq, ValueType const&elem) Chris@102: { Chris@102: sbq.push_back(elem); Chris@102: return sbq; Chris@102: } Chris@102: Chris@102: template Chris@102: sync_bounded_queue& operator>>(sync_bounded_queue& sbq, ValueType &elem) Chris@102: { Chris@102: sbq.pull_front(elem); Chris@102: return sbq; Chris@102: } Chris@102: } Chris@102: using concurrent::sync_bounded_queue; Chris@102: Chris@102: } Chris@102: Chris@102: #include Chris@102: Chris@102: #endif