Chris@102: #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP Chris@102: #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP Chris@102: Chris@102: ////////////////////////////////////////////////////////////////////////////// Chris@102: // Chris@102: // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost Chris@102: // Software License, Version 1.0. (See accompanying file Chris@102: // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@102: // Chris@102: // See http://www.boost.org/libs/thread for documentation. Chris@102: // Chris@102: ////////////////////////////////////////////////////////////////////////////// Chris@102: Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: Chris@102: #include Chris@102: Chris@102: namespace boost Chris@102: { Chris@102: namespace concurrent Chris@102: { Chris@102: template > Chris@102: class sync_queue Chris@102: : public detail::sync_queue_base Chris@102: { Chris@102: typedef detail::sync_queue_base super; Chris@102: Chris@102: public: Chris@102: typedef ValueType value_type; Chris@102: //typedef typename super::value_type value_type; // fixme Chris@102: typedef typename super::underlying_queue_type underlying_queue_type; Chris@102: typedef typename super::size_type size_type; Chris@102: typedef typename super::op_status op_status; Chris@102: Chris@102: // Constructors/Assignment/Destructors Chris@102: BOOST_THREAD_NO_COPYABLE(sync_queue) Chris@102: inline sync_queue(); Chris@102: //template Chris@102: //inline explicit sync_queue(Range range); Chris@102: inline ~sync_queue(); Chris@102: Chris@102: // Modifiers Chris@102: Chris@102: inline void push(const value_type& x); Chris@102: inline queue_op_status try_push(const value_type& x); Chris@102: inline queue_op_status nonblocking_push(const value_type& x); Chris@102: inline queue_op_status wait_push(const value_type& x); Chris@102: inline void push(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x); Chris@102: inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x); Chris@102: Chris@102: // Observers/Modifiers Chris@102: inline void pull(value_type&); Chris@102: // enable_if is_nothrow_copy_movable Chris@102: inline value_type pull(); Chris@102: Chris@102: inline queue_op_status try_pull(value_type&); Chris@102: inline queue_op_status nonblocking_pull(value_type&); Chris@102: inline queue_op_status wait_pull(ValueType& elem); Chris@102: Chris@102: private: Chris@102: Chris@102: inline queue_op_status try_pull(value_type& x, unique_lock& lk); Chris@102: inline queue_op_status wait_pull(value_type& x, unique_lock& lk); Chris@102: inline queue_op_status try_push(const value_type& x, unique_lock& lk); Chris@102: inline queue_op_status wait_push(const value_type& x, unique_lock& lk); Chris@102: inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock& lk); Chris@102: inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock& lk); Chris@102: Chris@102: inline void pull(value_type& elem, unique_lock& ) Chris@102: { Chris@102: elem = boost::move(super::data_.front()); Chris@102: super::data_.pop_front(); Chris@102: } Chris@102: inline value_type pull(unique_lock& ) Chris@102: { Chris@102: value_type e = boost::move(super::data_.front()); Chris@102: super::data_.pop_front(); Chris@102: return boost::move(e); Chris@102: } Chris@102: Chris@102: inline void push(const value_type& elem, unique_lock& lk) Chris@102: { Chris@102: super::data_.push_back(elem); Chris@102: super::notify_not_empty_if_needed(lk); Chris@102: } Chris@102: Chris@102: inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock& lk) Chris@102: { Chris@102: super::data_.push_back(boost::move(elem)); Chris@102: super::notify_not_empty_if_needed(lk); Chris@102: } Chris@102: }; Chris@102: Chris@102: template Chris@102: sync_queue::sync_queue() : Chris@102: super() Chris@102: { Chris@102: } Chris@102: Chris@102: // template Chris@102: // template Chris@102: // explicit sync_queue::sync_queue(Range range) : Chris@102: // data_(), closed_(false) Chris@102: // { Chris@102: // try Chris@102: // { Chris@102: // typedef typename Range::iterator iterator_t; Chris@102: // iterator_t first = boost::begin(range); Chris@102: // iterator_t end = boost::end(range); Chris@102: // for (iterator_t cur = first; cur != end; ++cur) Chris@102: // { Chris@102: // data_.push(boost::move(*cur));; Chris@102: // } Chris@102: // notify_not_empty_if_needed(lk); Chris@102: // } Chris@102: // catch (...) Chris@102: // { Chris@102: // delete[] data_; Chris@102: // } Chris@102: // } Chris@102: Chris@102: template Chris@102: sync_queue::~sync_queue() Chris@102: { Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::try_pull(ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (super::empty(lk)) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: return queue_op_status::empty; Chris@102: } Chris@102: pull(elem, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: template Chris@102: queue_op_status sync_queue::wait_pull(ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (super::empty(lk)) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: } Chris@102: bool has_been_closed = super::wait_until_not_empty_or_closed(lk); Chris@102: if (has_been_closed) return queue_op_status::closed; Chris@102: pull(elem, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::try_pull(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: return try_pull(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::wait_pull(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: return wait_pull(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::nonblocking_pull(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) Chris@102: { Chris@102: return queue_op_status::busy; Chris@102: } Chris@102: return try_pull(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_queue::pull(ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: super::wait_until_not_empty(lk); Chris@102: pull(elem, lk); Chris@102: } Chris@102: Chris@102: // enable if ValueType is nothrow movable Chris@102: template Chris@102: ValueType sync_queue::pull() Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: super::wait_until_not_empty(lk); Chris@102: return pull(lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::try_push(const ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: push(elem, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::try_push(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: return try_push(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::wait_push(const ValueType& elem, unique_lock& lk) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: push(elem, lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::wait_push(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: return wait_push(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::nonblocking_push(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) return queue_op_status::busy; Chris@102: return try_push(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_queue::push(const ValueType& elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: super::throw_if_closed(lk); Chris@102: push(elem, lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock& lk) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: push(boost::move(elem), lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::try_push(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: return try_push(boost::move(elem), lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::wait_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock& lk) Chris@102: { Chris@102: if (super::closed(lk)) return queue_op_status::closed; Chris@102: push(boost::move(elem), lk); Chris@102: return queue_op_status::success; Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::wait_push(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: return wait_push(boost::move(elem), lk); Chris@102: } Chris@102: Chris@102: template Chris@102: queue_op_status sync_queue::nonblocking_push(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_, try_to_lock); Chris@102: if (!lk.owns_lock()) Chris@102: { Chris@102: return queue_op_status::busy; Chris@102: } Chris@102: return try_push(boost::move(elem), lk); Chris@102: } Chris@102: Chris@102: template Chris@102: void sync_queue::push(BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: unique_lock lk(super::mtx_); Chris@102: super::throw_if_closed(lk); Chris@102: push(boost::move(elem), lk); Chris@102: } Chris@102: Chris@102: template Chris@102: sync_queue& operator<<(sync_queue& sbq, BOOST_THREAD_RV_REF(ValueType) elem) Chris@102: { Chris@102: sbq.push(boost::move(elem)); Chris@102: return sbq; Chris@102: } Chris@102: Chris@102: template Chris@102: sync_queue& operator<<(sync_queue& sbq, ValueType const&elem) Chris@102: { Chris@102: sbq.push(elem); Chris@102: return sbq; Chris@102: } Chris@102: Chris@102: template Chris@102: sync_queue& operator>>(sync_queue& sbq, ValueType &elem) Chris@102: { Chris@102: sbq.pull(elem); Chris@102: return sbq; Chris@102: } Chris@102: Chris@102: } Chris@102: using concurrent::sync_queue; Chris@102: Chris@102: } Chris@102: Chris@102: #include Chris@102: Chris@102: #endif