Chris@16: // lock-free single-producer/single-consumer ringbuffer Chris@16: // this algorithm is implemented in various projects (linux kernel) Chris@16: // Chris@16: // Copyright (C) 2009-2013 Tim Blechmann Chris@16: // Chris@16: // Distributed under the Boost Software License, Version 1.0. (See Chris@16: // accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED Chris@16: #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED Chris@16: Chris@16: #include Chris@16: #include Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@101: #include Chris@16: Chris@16: #include Chris@101: #include Chris@16: Chris@16: #include Chris@16: #include Chris@101: #include Chris@16: #include Chris@16: #include Chris@16: Chris@101: #ifdef BOOST_HAS_PRAGMA_ONCE Chris@101: #pragma once Chris@101: #endif Chris@16: Chris@16: namespace boost { Chris@16: namespace lockfree { Chris@16: namespace detail { Chris@16: Chris@16: typedef parameter::parameters, Chris@16: boost::parameter::optional Chris@16: > ringbuffer_signature; Chris@16: Chris@16: template Chris@16: class ringbuffer_base Chris@16: { Chris@16: #ifndef BOOST_DOXYGEN_INVOKED Chris@101: protected: Chris@16: typedef std::size_t size_t; Chris@16: static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t); Chris@16: atomic write_index_; Chris@16: char padding1[padding_size]; /* force read_index and write_index to different cache lines */ Chris@16: atomic read_index_; Chris@16: Chris@101: BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&)) Chris@101: BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&)) Chris@16: Chris@16: protected: Chris@16: ringbuffer_base(void): Chris@16: write_index_(0), read_index_(0) Chris@16: {} Chris@16: Chris@16: static size_t next_index(size_t arg, size_t max_size) Chris@16: { Chris@16: size_t ret = arg + 1; Chris@16: while (unlikely(ret >= max_size)) Chris@16: ret -= max_size; Chris@16: return ret; Chris@16: } Chris@16: Chris@16: static size_t read_available(size_t write_index, size_t read_index, size_t max_size) Chris@16: { Chris@16: if (write_index >= read_index) Chris@16: return write_index - read_index; Chris@16: Chris@101: const size_t ret = write_index + max_size - read_index; Chris@16: return ret; Chris@16: } Chris@16: Chris@16: static size_t write_available(size_t write_index, size_t read_index, size_t max_size) Chris@16: { Chris@16: size_t ret = read_index - write_index - 1; Chris@16: if (write_index >= read_index) Chris@16: ret += max_size; Chris@16: return ret; Chris@16: } Chris@16: Chris@101: size_t read_available(size_t max_size) const Chris@101: { Chris@101: size_t write_index = write_index_.load(memory_order_relaxed); Chris@101: const size_t read_index = read_index_.load(memory_order_relaxed); Chris@101: return read_available(write_index, read_index, max_size); Chris@101: } Chris@101: Chris@101: size_t write_available(size_t max_size) const Chris@101: { Chris@101: size_t write_index = write_index_.load(memory_order_relaxed); Chris@101: const size_t read_index = read_index_.load(memory_order_relaxed); Chris@101: return write_available(write_index, read_index, max_size); Chris@101: } Chris@101: Chris@16: bool push(T const & t, T * buffer, size_t max_size) Chris@16: { Chris@16: const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread Chris@16: const size_t next = next_index(write_index, max_size); Chris@16: Chris@16: if (next == read_index_.load(memory_order_acquire)) Chris@16: return false; /* ringbuffer is full */ Chris@16: Chris@16: new (buffer + write_index) T(t); // copy-construct Chris@16: Chris@16: write_index_.store(next, memory_order_release); Chris@16: Chris@16: return true; Chris@16: } Chris@16: Chris@16: size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size) Chris@16: { Chris@16: return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer; Chris@16: } Chris@16: Chris@16: template Chris@16: ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size) Chris@16: { Chris@16: // FIXME: avoid std::distance Chris@16: Chris@16: const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread Chris@16: const size_t read_index = read_index_.load(memory_order_acquire); Chris@16: const size_t avail = write_available(write_index, read_index, max_size); Chris@16: Chris@16: if (avail == 0) Chris@16: return begin; Chris@16: Chris@16: size_t input_count = std::distance(begin, end); Chris@16: input_count = (std::min)(input_count, avail); Chris@16: Chris@16: size_t new_write_index = write_index + input_count; Chris@16: Chris@16: const ConstIterator last = boost::next(begin, input_count); Chris@16: Chris@16: if (write_index + input_count > max_size) { Chris@16: /* copy data in two sections */ Chris@16: const size_t count0 = max_size - write_index; Chris@16: const ConstIterator midpoint = boost::next(begin, count0); Chris@16: Chris@16: std::uninitialized_copy(begin, midpoint, internal_buffer + write_index); Chris@16: std::uninitialized_copy(midpoint, last, internal_buffer); Chris@16: new_write_index -= max_size; Chris@16: } else { Chris@16: std::uninitialized_copy(begin, last, internal_buffer + write_index); Chris@16: Chris@16: if (new_write_index == max_size) Chris@16: new_write_index = 0; Chris@16: } Chris@16: Chris@16: write_index_.store(new_write_index, memory_order_release); Chris@16: return last; Chris@16: } Chris@16: Chris@101: template Chris@101: bool consume_one(Functor & functor, T * buffer, size_t max_size) Chris@16: { Chris@16: const size_t write_index = write_index_.load(memory_order_acquire); Chris@16: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@101: if ( empty(write_index, read_index) ) Chris@16: return false; Chris@16: Chris@101: T & object_to_consume = buffer[read_index]; Chris@101: functor( object_to_consume ); Chris@101: object_to_consume.~T(); Chris@16: Chris@16: size_t next = next_index(read_index, max_size); Chris@16: read_index_.store(next, memory_order_release); Chris@16: return true; Chris@16: } Chris@16: Chris@101: template Chris@101: bool consume_one(Functor const & functor, T * buffer, size_t max_size) Chris@101: { Chris@101: const size_t write_index = write_index_.load(memory_order_acquire); Chris@101: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@101: if ( empty(write_index, read_index) ) Chris@101: return false; Chris@101: Chris@101: T & object_to_consume = buffer[read_index]; Chris@101: functor( object_to_consume ); Chris@101: object_to_consume.~T(); Chris@101: Chris@101: size_t next = next_index(read_index, max_size); Chris@101: read_index_.store(next, memory_order_release); Chris@101: return true; Chris@101: } Chris@101: Chris@101: template Chris@101: size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size) Chris@101: { Chris@101: const size_t write_index = write_index_.load(memory_order_acquire); Chris@101: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@101: Chris@101: const size_t avail = read_available(write_index, read_index, max_size); Chris@101: Chris@101: if (avail == 0) Chris@101: return 0; Chris@101: Chris@101: const size_t output_count = avail; Chris@101: Chris@101: size_t new_read_index = read_index + output_count; Chris@101: Chris@101: if (read_index + output_count > max_size) { Chris@101: /* copy data in two sections */ Chris@101: const size_t count0 = max_size - read_index; Chris@101: const size_t count1 = output_count - count0; Chris@101: Chris@101: run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); Chris@101: run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); Chris@101: Chris@101: new_read_index -= max_size; Chris@101: } else { Chris@101: run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); Chris@101: Chris@101: if (new_read_index == max_size) Chris@101: new_read_index = 0; Chris@101: } Chris@101: Chris@101: read_index_.store(new_read_index, memory_order_release); Chris@101: return output_count; Chris@101: } Chris@101: Chris@101: template Chris@101: size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size) Chris@101: { Chris@101: const size_t write_index = write_index_.load(memory_order_acquire); Chris@101: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@101: Chris@101: const size_t avail = read_available(write_index, read_index, max_size); Chris@101: Chris@101: if (avail == 0) Chris@101: return 0; Chris@101: Chris@101: const size_t output_count = avail; Chris@101: Chris@101: size_t new_read_index = read_index + output_count; Chris@101: Chris@101: if (read_index + output_count > max_size) { Chris@101: /* copy data in two sections */ Chris@101: const size_t count0 = max_size - read_index; Chris@101: const size_t count1 = output_count - count0; Chris@101: Chris@101: run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); Chris@101: run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); Chris@101: Chris@101: new_read_index -= max_size; Chris@101: } else { Chris@101: run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); Chris@101: Chris@101: if (new_read_index == max_size) Chris@101: new_read_index = 0; Chris@101: } Chris@101: Chris@101: read_index_.store(new_read_index, memory_order_release); Chris@101: return output_count; Chris@101: } Chris@101: Chris@16: size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size) Chris@16: { Chris@16: const size_t write_index = write_index_.load(memory_order_acquire); Chris@16: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@16: Chris@16: const size_t avail = read_available(write_index, read_index, max_size); Chris@16: Chris@16: if (avail == 0) Chris@16: return 0; Chris@16: Chris@16: output_count = (std::min)(output_count, avail); Chris@16: Chris@16: size_t new_read_index = read_index + output_count; Chris@16: Chris@16: if (read_index + output_count > max_size) { Chris@16: /* copy data in two sections */ Chris@16: const size_t count0 = max_size - read_index; Chris@16: const size_t count1 = output_count - count0; Chris@16: Chris@16: copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer); Chris@16: copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0); Chris@16: Chris@16: new_read_index -= max_size; Chris@16: } else { Chris@16: copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer); Chris@16: if (new_read_index == max_size) Chris@16: new_read_index = 0; Chris@16: } Chris@16: Chris@16: read_index_.store(new_read_index, memory_order_release); Chris@16: return output_count; Chris@16: } Chris@16: Chris@16: template Chris@101: size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size) Chris@16: { Chris@16: const size_t write_index = write_index_.load(memory_order_acquire); Chris@16: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@16: Chris@16: const size_t avail = read_available(write_index, read_index, max_size); Chris@16: if (avail == 0) Chris@16: return 0; Chris@16: Chris@16: size_t new_read_index = read_index + avail; Chris@16: Chris@16: if (read_index + avail > max_size) { Chris@16: /* copy data in two sections */ Chris@16: const size_t count0 = max_size - read_index; Chris@16: const size_t count1 = avail - count0; Chris@16: Chris@16: it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it); Chris@16: copy_and_delete(internal_buffer, internal_buffer + count1, it); Chris@16: Chris@16: new_read_index -= max_size; Chris@16: } else { Chris@16: copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it); Chris@16: if (new_read_index == max_size) Chris@16: new_read_index = 0; Chris@16: } Chris@16: Chris@16: read_index_.store(new_read_index, memory_order_release); Chris@16: return avail; Chris@16: } Chris@101: Chris@101: const T& front(const T * internal_buffer) const Chris@101: { Chris@101: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@101: return *(internal_buffer + read_index); Chris@101: } Chris@101: Chris@101: T& front(T * internal_buffer) Chris@101: { Chris@101: const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread Chris@101: return *(internal_buffer + read_index); Chris@101: } Chris@16: #endif Chris@16: Chris@16: Chris@16: public: Chris@16: /** reset the ringbuffer Chris@16: * Chris@16: * \note Not thread-safe Chris@16: * */ Chris@16: void reset(void) Chris@16: { Chris@101: if ( !boost::has_trivial_destructor::value ) { Chris@101: // make sure to call all destructors! Chris@101: Chris@101: T dummy_element; Chris@101: while (pop(dummy_element)) Chris@101: {} Chris@101: } else { Chris@101: write_index_.store(0, memory_order_relaxed); Chris@101: read_index_.store(0, memory_order_release); Chris@101: } Chris@16: } Chris@16: Chris@16: /** Check if the ringbuffer is empty Chris@16: * Chris@16: * \return true, if the ringbuffer is empty, false otherwise Chris@16: * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate. Chris@16: * */ Chris@16: bool empty(void) Chris@16: { Chris@16: return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed)); Chris@16: } Chris@16: Chris@16: /** Chris@16: * \return true, if implementation is lock-free. Chris@16: * Chris@16: * */ Chris@16: bool is_lock_free(void) const Chris@16: { Chris@16: return write_index_.is_lock_free() && read_index_.is_lock_free(); Chris@16: } Chris@16: Chris@16: private: Chris@16: bool empty(size_t write_index, size_t read_index) Chris@16: { Chris@16: return write_index == read_index; Chris@16: } Chris@16: Chris@16: template< class OutputIterator > Chris@16: OutputIterator copy_and_delete( T * first, T * last, OutputIterator out ) Chris@16: { Chris@16: if (boost::has_trivial_destructor::value) { Chris@16: return std::copy(first, last, out); // will use memcpy if possible Chris@16: } else { Chris@16: for (; first != last; ++first, ++out) { Chris@16: *out = *first; Chris@16: first->~T(); Chris@16: } Chris@16: return out; Chris@16: } Chris@16: } Chris@101: Chris@101: template< class Functor > Chris@101: void run_functor_and_delete( T * first, T * last, Functor & functor ) Chris@101: { Chris@101: for (; first != last; ++first) { Chris@101: functor(*first); Chris@101: first->~T(); Chris@101: } Chris@101: } Chris@101: Chris@101: template< class Functor > Chris@101: void run_functor_and_delete( T * first, T * last, Functor const & functor ) Chris@101: { Chris@101: for (; first != last; ++first) { Chris@101: functor(*first); Chris@101: first->~T(); Chris@101: } Chris@101: } Chris@16: }; Chris@16: Chris@16: template Chris@16: class compile_time_sized_ringbuffer: Chris@16: public ringbuffer_base Chris@16: { Chris@16: typedef std::size_t size_type; Chris@16: static const std::size_t max_size = MaxSize + 1; Chris@16: Chris@16: typedef typename boost::aligned_storage::value Chris@16: >::type storage_type; Chris@16: Chris@16: storage_type storage_; Chris@16: Chris@16: T * data() Chris@16: { Chris@16: return static_cast(storage_.address()); Chris@16: } Chris@16: Chris@101: const T * data() const Chris@101: { Chris@101: return static_cast(storage_.address()); Chris@101: } Chris@101: Chris@101: protected: Chris@101: size_type max_number_of_elements() const Chris@101: { Chris@101: return max_size; Chris@101: } Chris@101: Chris@16: public: Chris@16: bool push(T const & t) Chris@16: { Chris@16: return ringbuffer_base::push(t, data(), max_size); Chris@16: } Chris@16: Chris@101: template Chris@101: bool consume_one(Functor & f) Chris@16: { Chris@101: return ringbuffer_base::consume_one(f, data(), max_size); Chris@101: } Chris@101: Chris@101: template Chris@101: bool consume_one(Functor const & f) Chris@101: { Chris@101: return ringbuffer_base::consume_one(f, data(), max_size); Chris@101: } Chris@101: Chris@101: template Chris@101: size_type consume_all(Functor & f) Chris@101: { Chris@101: return ringbuffer_base::consume_all(f, data(), max_size); Chris@101: } Chris@101: Chris@101: template Chris@101: size_type consume_all(Functor const & f) Chris@101: { Chris@101: return ringbuffer_base::consume_all(f, data(), max_size); Chris@16: } Chris@16: Chris@16: size_type push(T const * t, size_type size) Chris@16: { Chris@16: return ringbuffer_base::push(t, size, data(), max_size); Chris@16: } Chris@16: Chris@16: template Chris@16: size_type push(T const (&t)[size]) Chris@16: { Chris@16: return push(t, size); Chris@16: } Chris@16: Chris@16: template Chris@16: ConstIterator push(ConstIterator begin, ConstIterator end) Chris@16: { Chris@16: return ringbuffer_base::push(begin, end, data(), max_size); Chris@16: } Chris@16: Chris@16: size_type pop(T * ret, size_type size) Chris@16: { Chris@16: return ringbuffer_base::pop(ret, size, data(), max_size); Chris@16: } Chris@16: Chris@101: template Chris@101: size_type pop_to_output_iterator(OutputIterator it) Chris@16: { Chris@101: return ringbuffer_base::pop_to_output_iterator(it, data(), max_size); Chris@16: } Chris@16: Chris@101: const T& front(void) const Chris@16: { Chris@101: return ringbuffer_base::front(data()); Chris@101: } Chris@101: Chris@101: T& front(void) Chris@101: { Chris@101: return ringbuffer_base::front(data()); Chris@16: } Chris@16: }; Chris@16: Chris@16: template Chris@16: class runtime_sized_ringbuffer: Chris@16: public ringbuffer_base, Chris@16: private Alloc Chris@16: { Chris@16: typedef std::size_t size_type; Chris@16: size_type max_elements_; Chris@16: typedef typename Alloc::pointer pointer; Chris@16: pointer array_; Chris@16: Chris@101: protected: Chris@101: size_type max_number_of_elements() const Chris@101: { Chris@101: return max_elements_; Chris@101: } Chris@101: Chris@16: public: Chris@16: explicit runtime_sized_ringbuffer(size_type max_elements): Chris@16: max_elements_(max_elements + 1) Chris@16: { Chris@16: array_ = Alloc::allocate(max_elements_); Chris@16: } Chris@16: Chris@16: template Chris@16: runtime_sized_ringbuffer(typename Alloc::template rebind::other const & alloc, size_type max_elements): Chris@16: Alloc(alloc), max_elements_(max_elements + 1) Chris@16: { Chris@16: array_ = Alloc::allocate(max_elements_); Chris@16: } Chris@16: Chris@16: runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements): Chris@16: Alloc(alloc), max_elements_(max_elements + 1) Chris@16: { Chris@16: array_ = Alloc::allocate(max_elements_); Chris@16: } Chris@16: Chris@16: ~runtime_sized_ringbuffer(void) Chris@16: { Chris@16: // destroy all remaining items Chris@16: T out; Chris@101: while (pop(&out, 1)) {} Chris@16: Chris@16: Alloc::deallocate(array_, max_elements_); Chris@16: } Chris@16: Chris@16: bool push(T const & t) Chris@16: { Chris@16: return ringbuffer_base::push(t, &*array_, max_elements_); Chris@16: } Chris@16: Chris@101: template Chris@101: bool consume_one(Functor & f) Chris@16: { Chris@101: return ringbuffer_base::consume_one(f, &*array_, max_elements_); Chris@101: } Chris@101: Chris@101: template Chris@101: bool consume_one(Functor const & f) Chris@101: { Chris@101: return ringbuffer_base::consume_one(f, &*array_, max_elements_); Chris@101: } Chris@101: Chris@101: template Chris@101: size_type consume_all(Functor & f) Chris@101: { Chris@101: return ringbuffer_base::consume_all(f, &*array_, max_elements_); Chris@101: } Chris@101: Chris@101: template Chris@101: size_type consume_all(Functor const & f) Chris@101: { Chris@101: return ringbuffer_base::consume_all(f, &*array_, max_elements_); Chris@16: } Chris@16: Chris@16: size_type push(T const * t, size_type size) Chris@16: { Chris@16: return ringbuffer_base::push(t, size, &*array_, max_elements_); Chris@16: } Chris@16: Chris@16: template Chris@16: size_type push(T const (&t)[size]) Chris@16: { Chris@16: return push(t, size); Chris@16: } Chris@16: Chris@16: template Chris@16: ConstIterator push(ConstIterator begin, ConstIterator end) Chris@16: { Chris@16: return ringbuffer_base::push(begin, end, array_, max_elements_); Chris@16: } Chris@16: Chris@16: size_type pop(T * ret, size_type size) Chris@16: { Chris@16: return ringbuffer_base::pop(ret, size, array_, max_elements_); Chris@16: } Chris@16: Chris@101: template Chris@101: size_type pop_to_output_iterator(OutputIterator it) Chris@16: { Chris@101: return ringbuffer_base::pop_to_output_iterator(it, array_, max_elements_); Chris@16: } Chris@16: Chris@101: const T& front(void) const Chris@16: { Chris@101: return ringbuffer_base::front(array_); Chris@101: } Chris@101: Chris@101: T& front(void) Chris@101: { Chris@101: return ringbuffer_base::front(array_); Chris@16: } Chris@16: }; Chris@16: Chris@16: template Chris@16: struct make_ringbuffer Chris@16: { Chris@16: typedef typename ringbuffer_signature::bind::type bound_args; Chris@16: Chris@16: typedef extract_capacity extract_capacity_t; Chris@16: Chris@16: static const bool runtime_sized = !extract_capacity_t::has_capacity; Chris@16: static const size_t capacity = extract_capacity_t::capacity; Chris@16: Chris@16: typedef extract_allocator extract_allocator_t; Chris@16: typedef typename extract_allocator_t::type allocator; Chris@16: Chris@16: // allocator argument is only sane, for run-time sized ringbuffers Chris@16: BOOST_STATIC_ASSERT((mpl::if_, Chris@16: mpl::bool_, Chris@16: mpl::true_ Chris@16: >::type::value)); Chris@16: Chris@16: typedef typename mpl::if_c, Chris@16: compile_time_sized_ringbuffer Chris@16: >::type ringbuffer_type; Chris@16: }; Chris@16: Chris@16: Chris@16: } /* namespace detail */ Chris@16: Chris@16: Chris@16: /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free. Chris@16: * Chris@16: * \b Policies: Chris@16: * - \c boost::lockfree::capacity<>, optional
Chris@16: * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time. Chris@16: * Chris@16: * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator>
Chris@16: * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured Chris@16: * to be sized at run-time Chris@16: * Chris@16: * \b Requirements: Chris@16: * - T must have a default constructor Chris@16: * - T must be copyable Chris@16: * */ Chris@16: #ifndef BOOST_DOXYGEN_INVOKED Chris@16: template Chris@16: #else Chris@16: template Chris@16: #endif Chris@16: class spsc_queue: Chris@16: public detail::make_ringbuffer::ringbuffer_type Chris@16: { Chris@16: private: Chris@16: Chris@16: #ifndef BOOST_DOXYGEN_INVOKED Chris@16: typedef typename detail::make_ringbuffer::ringbuffer_type base_type; Chris@16: static const bool runtime_sized = detail::make_ringbuffer::runtime_sized; Chris@16: typedef typename detail::make_ringbuffer::allocator allocator_arg; Chris@16: Chris@16: struct implementation_defined Chris@16: { Chris@16: typedef allocator_arg allocator; Chris@16: typedef std::size_t size_type; Chris@16: }; Chris@16: #endif Chris@16: Chris@16: public: Chris@16: typedef T value_type; Chris@16: typedef typename implementation_defined::allocator allocator; Chris@16: typedef typename implementation_defined::size_type size_type; Chris@16: Chris@16: /** Constructs a spsc_queue Chris@16: * Chris@16: * \pre spsc_queue must be configured to be sized at compile-time Chris@16: */ Chris@16: // @{ Chris@16: spsc_queue(void) Chris@16: { Chris@16: BOOST_ASSERT(!runtime_sized); Chris@16: } Chris@16: Chris@16: template Chris@16: explicit spsc_queue(typename allocator::template rebind::other const & alloc) Chris@16: { Chris@16: // just for API compatibility: we don't actually need an allocator Chris@16: BOOST_STATIC_ASSERT(!runtime_sized); Chris@16: } Chris@16: Chris@16: explicit spsc_queue(allocator const & alloc) Chris@16: { Chris@16: // just for API compatibility: we don't actually need an allocator Chris@16: BOOST_ASSERT(!runtime_sized); Chris@16: } Chris@16: // @} Chris@16: Chris@16: Chris@16: /** Constructs a spsc_queue for element_count elements Chris@16: * Chris@16: * \pre spsc_queue must be configured to be sized at run-time Chris@16: */ Chris@16: // @{ Chris@16: explicit spsc_queue(size_type element_count): Chris@16: base_type(element_count) Chris@16: { Chris@16: BOOST_ASSERT(runtime_sized); Chris@16: } Chris@16: Chris@16: template Chris@16: spsc_queue(size_type element_count, typename allocator::template rebind::other const & alloc): Chris@16: base_type(alloc, element_count) Chris@16: { Chris@16: BOOST_STATIC_ASSERT(runtime_sized); Chris@16: } Chris@16: Chris@16: spsc_queue(size_type element_count, allocator_arg const & alloc): Chris@16: base_type(alloc, element_count) Chris@16: { Chris@16: BOOST_ASSERT(runtime_sized); Chris@16: } Chris@16: // @} Chris@16: Chris@16: /** Pushes object t to the ringbuffer. Chris@16: * Chris@16: * \pre only one thread is allowed to push data to the spsc_queue Chris@16: * \post object will be pushed to the spsc_queue, unless it is full. Chris@16: * \return true, if the push operation is successful. Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: * */ Chris@16: bool push(T const & t) Chris@16: { Chris@16: return base_type::push(t); Chris@16: } Chris@16: Chris@16: /** Pops one object from ringbuffer. Chris@16: * Chris@16: * \pre only one thread is allowed to pop data to the spsc_queue Chris@101: * \post if ringbuffer is not empty, object will be discarded. Chris@101: * \return true, if the pop operation is successful, false if ringbuffer was empty. Chris@101: * Chris@101: * \note Thread-safe and wait-free Chris@101: */ Chris@101: bool pop () Chris@101: { Chris@101: detail::consume_noop consume_functor; Chris@101: return consume_one( consume_functor ); Chris@101: } Chris@101: Chris@101: /** Pops one object from ringbuffer. Chris@101: * Chris@101: * \pre only one thread is allowed to pop data to the spsc_queue Chris@16: * \post if ringbuffer is not empty, object will be copied to ret. Chris@16: * \return true, if the pop operation is successful, false if ringbuffer was empty. Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: */ Chris@101: template Chris@101: typename boost::enable_if::type, bool>::type Chris@101: pop (U & ret) Chris@16: { Chris@101: detail::consume_via_copy consume_functor(ret); Chris@101: return consume_one( consume_functor ); Chris@16: } Chris@16: Chris@16: /** Pushes as many objects from the array t as there is space. Chris@16: * Chris@16: * \pre only one thread is allowed to push data to the spsc_queue Chris@16: * \return number of pushed items Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: */ Chris@16: size_type push(T const * t, size_type size) Chris@16: { Chris@16: return base_type::push(t, size); Chris@16: } Chris@16: Chris@16: /** Pushes as many objects from the array t as there is space available. Chris@16: * Chris@16: * \pre only one thread is allowed to push data to the spsc_queue Chris@16: * \return number of pushed items Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: */ Chris@16: template Chris@16: size_type push(T const (&t)[size]) Chris@16: { Chris@16: return push(t, size); Chris@16: } Chris@16: Chris@16: /** Pushes as many objects from the range [begin, end) as there is space . Chris@16: * Chris@16: * \pre only one thread is allowed to push data to the spsc_queue Chris@16: * \return iterator to the first element, which has not been pushed Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: */ Chris@16: template Chris@16: ConstIterator push(ConstIterator begin, ConstIterator end) Chris@16: { Chris@16: return base_type::push(begin, end); Chris@16: } Chris@16: Chris@16: /** Pops a maximum of size objects from ringbuffer. Chris@16: * Chris@16: * \pre only one thread is allowed to pop data to the spsc_queue Chris@16: * \return number of popped items Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: * */ Chris@16: size_type pop(T * ret, size_type size) Chris@16: { Chris@16: return base_type::pop(ret, size); Chris@16: } Chris@16: Chris@16: /** Pops a maximum of size objects from spsc_queue. Chris@16: * Chris@16: * \pre only one thread is allowed to pop data to the spsc_queue Chris@16: * \return number of popped items Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: * */ Chris@16: template Chris@16: size_type pop(T (&ret)[size]) Chris@16: { Chris@16: return pop(ret, size); Chris@16: } Chris@16: Chris@16: /** Pops objects to the output iterator it Chris@16: * Chris@16: * \pre only one thread is allowed to pop data to the spsc_queue Chris@16: * \return number of popped items Chris@16: * Chris@16: * \note Thread-safe and wait-free Chris@16: * */ Chris@16: template Chris@101: typename boost::disable_if::type, size_type>::type Chris@101: pop(OutputIterator it) Chris@16: { Chris@101: return base_type::pop_to_output_iterator(it); Chris@16: } Chris@16: Chris@16: /** consumes one element via a functor Chris@16: * Chris@16: * pops one element from the queue and applies the functor on this object Chris@16: * Chris@16: * \returns true, if one element was consumed Chris@16: * Chris@16: * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking Chris@16: * */ Chris@16: template Chris@16: bool consume_one(Functor & f) Chris@16: { Chris@101: return base_type::consume_one(f); Chris@16: } Chris@16: Chris@16: /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs) Chris@16: template Chris@16: bool consume_one(Functor const & f) Chris@16: { Chris@101: return base_type::consume_one(f); Chris@16: } Chris@16: Chris@16: /** consumes all elements via a functor Chris@16: * Chris@16: * sequentially pops all elements from the queue and applies the functor on each object Chris@16: * Chris@16: * \returns number of elements that are consumed Chris@16: * Chris@16: * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking Chris@16: * */ Chris@16: template Chris@16: size_type consume_all(Functor & f) Chris@16: { Chris@101: return base_type::consume_all(f); Chris@16: } Chris@16: Chris@16: /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs) Chris@16: template Chris@16: size_type consume_all(Functor const & f) Chris@16: { Chris@101: return base_type::consume_all(f); Chris@101: } Chris@16: Chris@101: /** get number of elements that are available for read Chris@101: * Chris@101: * \return number of available elements that can be popped from the spsc_queue Chris@101: * Chris@101: * \note Thread-safe and wait-free, should only be called from the producer thread Chris@101: * */ Chris@101: size_type read_available() const Chris@101: { Chris@101: return base_type::read_available(base_type::max_number_of_elements()); Chris@16: } Chris@101: Chris@101: /** get write space to write elements Chris@101: * Chris@101: * \return number of elements that can be pushed to the spsc_queue Chris@101: * Chris@101: * \note Thread-safe and wait-free, should only be called from the consumer thread Chris@101: * */ Chris@101: size_type write_available() const Chris@101: { Chris@101: return base_type::write_available(base_type::max_number_of_elements()); Chris@101: } Chris@101: Chris@101: /** get reference to element in the front of the queue Chris@101: * Chris@101: * Availability of front element can be checked using read_available(). Chris@101: * Chris@101: * \pre only one thread is allowed to check front element Chris@101: * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method. Chris@101: * \return reference to the first element in the queue Chris@101: * Chris@101: * \note Thread-safe and wait-free Chris@101: */ Chris@101: const T& front() const Chris@101: { Chris@101: BOOST_ASSERT(read_available() > 0); Chris@101: return base_type::front(); Chris@101: } Chris@101: Chris@101: /// \copydoc boost::lockfree::spsc_queue::front() const Chris@101: T& front() Chris@101: { Chris@101: BOOST_ASSERT(read_available() > 0); Chris@101: return base_type::front(); Chris@101: } Chris@101: Chris@101: /** reset the ringbuffer Chris@101: * Chris@101: * \note Not thread-safe Chris@101: * */ Chris@101: void reset(void) Chris@101: { Chris@101: if ( !boost::has_trivial_destructor::value ) { Chris@101: // make sure to call all destructors! Chris@101: Chris@101: T dummy_element; Chris@101: while (pop(dummy_element)) Chris@101: {} Chris@101: } else { Chris@101: base_type::write_index_.store(0, memory_order_relaxed); Chris@101: base_type::read_index_.store(0, memory_order_release); Chris@101: } Chris@101: } Chris@16: }; Chris@16: Chris@16: } /* namespace lockfree */ Chris@16: } /* namespace boost */ Chris@16: Chris@16: Chris@16: #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */