Chris@16: // lock-free queue from Chris@16: // Michael, M. M. and Scott, M. L., Chris@16: // "simple, fast and practical non-blocking and blocking concurrent queue algorithms" Chris@16: // Chris@16: // Copyright (C) 2008-2013 Tim Blechmann Chris@16: // Chris@16: // Distributed under the Boost Software License, Version 1.0. (See Chris@16: // accompanying file LICENSE_1_0.txt or copy at Chris@16: // http://www.boost.org/LICENSE_1_0.txt) Chris@16: Chris@16: #ifndef BOOST_LOCKFREE_FIFO_HPP_INCLUDED Chris@16: #define BOOST_LOCKFREE_FIFO_HPP_INCLUDED Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: Chris@101: #ifdef BOOST_HAS_PRAGMA_ONCE Chris@101: #pragma once Chris@101: #endif Chris@101: Chris@101: Chris@16: #if defined(_MSC_VER) Chris@16: #pragma warning(push) Chris@16: #pragma warning(disable: 4324) // structure was padded due to __declspec(align()) Chris@16: #endif Chris@16: Chris@16: Chris@16: namespace boost { Chris@16: namespace lockfree { Chris@16: namespace detail { Chris@16: Chris@16: typedef parameter::parameters, Chris@16: boost::parameter::optional Chris@16: > queue_signature; Chris@16: Chris@16: } /* namespace detail */ Chris@16: Chris@16: Chris@16: /** The queue class provides a multi-writer/multi-reader queue, pushing and popping is lock-free, Chris@16: * construction/destruction has to be synchronized. It uses a freelist for memory management, Chris@16: * freed nodes are pushed to the freelist and not returned to the OS before the queue is destroyed. Chris@16: * Chris@16: * \b Policies: Chris@16: * - \ref boost::lockfree::fixed_sized, defaults to \c boost::lockfree::fixed_sized \n Chris@16: * Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. \n Chris@16: * If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed Chris@16: * by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index Chris@16: * type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way Chris@16: * to achieve lock-freedom. Chris@16: * Chris@16: * - \ref boost::lockfree::capacity, optional \n Chris@16: * If this template argument is passed to the options, the size of the queue is set at compile-time.\n Chris@16: * It this option implies \c fixed_sized Chris@16: * Chris@16: * - \ref boost::lockfree::allocator, defaults to \c boost::lockfree::allocator> \n Chris@16: * Specifies the allocator that is used for the internal freelist Chris@16: * Chris@16: * \b Requirements: Chris@16: * - T must have a copy constructor Chris@16: * - T must have a trivial assignment operator Chris@16: * - T must have a trivial destructor Chris@16: * Chris@16: * */ Chris@16: #ifndef BOOST_DOXYGEN_INVOKED Chris@16: template Chris@16: #else Chris@16: template Chris@16: #endif Chris@16: class queue Chris@16: { Chris@16: private: Chris@16: #ifndef BOOST_DOXYGEN_INVOKED Chris@16: Chris@16: #ifdef BOOST_HAS_TRIVIAL_DESTRUCTOR Chris@16: BOOST_STATIC_ASSERT((boost::has_trivial_destructor::value)); Chris@16: #endif Chris@16: Chris@16: #ifdef BOOST_HAS_TRIVIAL_ASSIGN Chris@16: BOOST_STATIC_ASSERT((boost::has_trivial_assign::value)); Chris@16: #endif Chris@16: Chris@16: typedef typename detail::queue_signature::bind::type bound_args; Chris@16: Chris@16: static const bool has_capacity = detail::extract_capacity::has_capacity; Chris@16: static const size_t capacity = detail::extract_capacity::capacity + 1; // the queue uses one dummy node Chris@16: static const bool fixed_sized = detail::extract_fixed_sized::value; Chris@16: static const bool node_based = !(has_capacity || fixed_sized); Chris@16: static const bool compile_time_sized = has_capacity; Chris@16: Chris@16: struct BOOST_LOCKFREE_CACHELINE_ALIGNMENT node Chris@16: { Chris@16: typedef typename detail::select_tagged_handle::tagged_handle_type tagged_node_handle; Chris@16: typedef typename detail::select_tagged_handle::handle_type handle_type; Chris@16: Chris@16: node(T const & v, handle_type null_handle): Chris@16: data(v)//, next(tagged_node_handle(0, 0)) Chris@16: { Chris@16: /* increment tag to avoid ABA problem */ Chris@16: tagged_node_handle old_next = next.load(memory_order_relaxed); Chris@16: tagged_node_handle new_next (null_handle, old_next.get_next_tag()); Chris@16: next.store(new_next, memory_order_release); Chris@16: } Chris@16: Chris@16: node (handle_type null_handle): Chris@16: next(tagged_node_handle(null_handle, 0)) Chris@16: {} Chris@16: Chris@16: node(void) Chris@16: {} Chris@16: Chris@16: atomic next; Chris@16: T data; Chris@16: }; Chris@16: Chris@16: typedef typename detail::extract_allocator::type node_allocator; Chris@16: typedef typename detail::select_freelist::type pool_t; Chris@16: typedef typename pool_t::tagged_node_handle tagged_node_handle; Chris@16: typedef typename detail::select_tagged_handle::handle_type handle_type; Chris@16: Chris@16: void initialize(void) Chris@16: { Chris@16: node * n = pool.template construct(pool.null_handle()); Chris@16: tagged_node_handle dummy_node(pool.get_handle(n), 0); Chris@16: head_.store(dummy_node, memory_order_relaxed); Chris@16: tail_.store(dummy_node, memory_order_release); Chris@16: } Chris@16: Chris@16: struct implementation_defined Chris@16: { Chris@16: typedef node_allocator allocator; Chris@16: typedef std::size_t size_type; Chris@16: }; Chris@16: Chris@16: #endif Chris@16: Chris@101: BOOST_DELETED_FUNCTION(queue(queue const&)) Chris@101: BOOST_DELETED_FUNCTION(queue& operator= (queue const&)) Chris@16: Chris@16: public: Chris@16: typedef T value_type; Chris@16: typedef typename implementation_defined::allocator allocator; Chris@16: typedef typename implementation_defined::size_type size_type; Chris@16: Chris@16: /** Chris@16: * \return true, if implementation is lock-free. Chris@16: * Chris@16: * \warning It only checks, if the queue head and tail nodes and the freelist can be modified in a lock-free manner. Chris@16: * On most platforms, the whole implementation is lock-free, if this is true. Using c++0x-style atomics, there is Chris@16: * no possibility to provide a completely accurate implementation, because one would need to test every internal Chris@16: * node, which is impossible if further nodes will be allocated from the operating system. Chris@16: * */ Chris@16: bool is_lock_free (void) const Chris@16: { Chris@16: return head_.is_lock_free() && tail_.is_lock_free() && pool.is_lock_free(); Chris@16: } Chris@16: Chris@16: //! Construct queue Chris@16: // @{ Chris@16: queue(void): Chris@16: head_(tagged_node_handle(0, 0)), Chris@16: tail_(tagged_node_handle(0, 0)), Chris@16: pool(node_allocator(), capacity) Chris@16: { Chris@16: BOOST_ASSERT(has_capacity); Chris@16: initialize(); Chris@16: } Chris@16: Chris@16: template Chris@16: explicit queue(typename node_allocator::template rebind::other const & alloc): Chris@16: head_(tagged_node_handle(0, 0)), Chris@16: tail_(tagged_node_handle(0, 0)), Chris@16: pool(alloc, capacity) Chris@16: { Chris@16: BOOST_STATIC_ASSERT(has_capacity); Chris@16: initialize(); Chris@16: } Chris@16: Chris@16: explicit queue(allocator const & alloc): Chris@16: head_(tagged_node_handle(0, 0)), Chris@16: tail_(tagged_node_handle(0, 0)), Chris@16: pool(alloc, capacity) Chris@16: { Chris@16: BOOST_ASSERT(has_capacity); Chris@16: initialize(); Chris@16: } Chris@16: // @} Chris@16: Chris@16: //! Construct queue, allocate n nodes for the freelist. Chris@16: // @{ Chris@16: explicit queue(size_type n): Chris@16: head_(tagged_node_handle(0, 0)), Chris@16: tail_(tagged_node_handle(0, 0)), Chris@16: pool(node_allocator(), n + 1) Chris@16: { Chris@16: BOOST_ASSERT(!has_capacity); Chris@16: initialize(); Chris@16: } Chris@16: Chris@16: template Chris@16: queue(size_type n, typename node_allocator::template rebind::other const & alloc): Chris@16: head_(tagged_node_handle(0, 0)), Chris@16: tail_(tagged_node_handle(0, 0)), Chris@16: pool(alloc, n + 1) Chris@16: { Chris@16: BOOST_STATIC_ASSERT(!has_capacity); Chris@16: initialize(); Chris@16: } Chris@16: // @} Chris@16: Chris@16: /** \copydoc boost::lockfree::stack::reserve Chris@16: * */ Chris@16: void reserve(size_type n) Chris@16: { Chris@16: pool.template reserve(n); Chris@16: } Chris@16: Chris@16: /** \copydoc boost::lockfree::stack::reserve_unsafe Chris@16: * */ Chris@16: void reserve_unsafe(size_type n) Chris@16: { Chris@16: pool.template reserve(n); Chris@16: } Chris@16: Chris@16: /** Destroys queue, free all nodes from freelist. Chris@16: * */ Chris@16: ~queue(void) Chris@16: { Chris@16: T dummy; Chris@16: while(unsynchronized_pop(dummy)) Chris@16: {} Chris@16: Chris@16: pool.template destruct(head_.load(memory_order_relaxed)); Chris@16: } Chris@16: Chris@16: /** Check if the queue is empty Chris@16: * Chris@16: * \return true, if the queue is empty, false otherwise Chris@16: * \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this Chris@16: * value in program logic. Chris@16: * */ Chris@101: bool empty(void) const Chris@16: { Chris@16: return pool.get_handle(head_.load()) == pool.get_handle(tail_.load()); Chris@16: } Chris@16: Chris@16: /** Pushes object t to the queue. Chris@16: * Chris@16: * \post object will be pushed to the queue, if internal node can be allocated Chris@16: * \returns true, if the push operation is successful. Chris@16: * Chris@16: * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated Chris@16: * from the OS. This may not be lock-free. Chris@16: * */ Chris@16: bool push(T const & t) Chris@16: { Chris@16: return do_push(t); Chris@16: } Chris@16: Chris@16: /** Pushes object t to the queue. Chris@16: * Chris@16: * \post object will be pushed to the queue, if internal node can be allocated Chris@16: * \returns true, if the push operation is successful. Chris@16: * Chris@16: * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail Chris@16: * \throws if memory allocator throws Chris@16: * */ Chris@16: bool bounded_push(T const & t) Chris@16: { Chris@16: return do_push(t); Chris@16: } Chris@16: Chris@16: Chris@16: private: Chris@16: #ifndef BOOST_DOXYGEN_INVOKED Chris@16: template Chris@16: bool do_push(T const & t) Chris@16: { Chris@16: using detail::likely; Chris@16: Chris@16: node * n = pool.template construct(t, pool.null_handle()); Chris@16: handle_type node_handle = pool.get_handle(n); Chris@16: Chris@16: if (n == NULL) Chris@16: return false; Chris@16: Chris@16: for (;;) { Chris@16: tagged_node_handle tail = tail_.load(memory_order_acquire); Chris@16: node * tail_node = pool.get_pointer(tail); Chris@16: tagged_node_handle next = tail_node->next.load(memory_order_acquire); Chris@16: node * next_ptr = pool.get_pointer(next); Chris@16: Chris@16: tagged_node_handle tail2 = tail_.load(memory_order_acquire); Chris@16: if (likely(tail == tail2)) { Chris@16: if (next_ptr == 0) { Chris@16: tagged_node_handle new_tail_next(node_handle, next.get_next_tag()); Chris@16: if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) { Chris@16: tagged_node_handle new_tail(node_handle, tail.get_next_tag()); Chris@16: tail_.compare_exchange_strong(tail, new_tail); Chris@16: return true; Chris@16: } Chris@16: } Chris@16: else { Chris@16: tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag()); Chris@16: tail_.compare_exchange_strong(tail, new_tail); Chris@16: } Chris@16: } Chris@16: } Chris@16: } Chris@16: #endif Chris@16: Chris@16: public: Chris@16: Chris@16: /** Pushes object t to the queue. Chris@16: * Chris@16: * \post object will be pushed to the queue, if internal node can be allocated Chris@16: * \returns true, if the push operation is successful. Chris@16: * Chris@16: * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated Chris@16: * from the OS. This may not be lock-free. Chris@16: * \throws if memory allocator throws Chris@16: * */ Chris@16: bool unsynchronized_push(T const & t) Chris@16: { Chris@16: node * n = pool.template construct(t, pool.null_handle()); Chris@16: Chris@16: if (n == NULL) Chris@16: return false; Chris@16: Chris@16: for (;;) { Chris@16: tagged_node_handle tail = tail_.load(memory_order_relaxed); Chris@16: tagged_node_handle next = tail->next.load(memory_order_relaxed); Chris@16: node * next_ptr = next.get_ptr(); Chris@16: Chris@16: if (next_ptr == 0) { Chris@16: tail->next.store(tagged_node_handle(n, next.get_next_tag()), memory_order_relaxed); Chris@16: tail_.store(tagged_node_handle(n, tail.get_next_tag()), memory_order_relaxed); Chris@16: return true; Chris@16: } Chris@16: else Chris@16: tail_.store(tagged_node_handle(next_ptr, tail.get_next_tag()), memory_order_relaxed); Chris@16: } Chris@16: } Chris@16: Chris@16: /** Pops object from queue. Chris@16: * Chris@16: * \post if pop operation is successful, object will be copied to ret. Chris@16: * \returns true, if the pop operation is successful, false if queue was empty. Chris@16: * Chris@16: * \note Thread-safe and non-blocking Chris@16: * */ Chris@16: bool pop (T & ret) Chris@16: { Chris@16: return pop(ret); Chris@16: } Chris@16: Chris@16: /** Pops object from queue. Chris@16: * Chris@16: * \pre type U must be constructible by T and copyable, or T must be convertible to U Chris@16: * \post if pop operation is successful, object will be copied to ret. Chris@16: * \returns true, if the pop operation is successful, false if queue was empty. Chris@16: * Chris@16: * \note Thread-safe and non-blocking Chris@16: * */ Chris@16: template Chris@16: bool pop (U & ret) Chris@16: { Chris@16: using detail::likely; Chris@16: for (;;) { Chris@16: tagged_node_handle head = head_.load(memory_order_acquire); Chris@16: node * head_ptr = pool.get_pointer(head); Chris@16: Chris@16: tagged_node_handle tail = tail_.load(memory_order_acquire); Chris@16: tagged_node_handle next = head_ptr->next.load(memory_order_acquire); Chris@16: node * next_ptr = pool.get_pointer(next); Chris@16: Chris@16: tagged_node_handle head2 = head_.load(memory_order_acquire); Chris@16: if (likely(head == head2)) { Chris@16: if (pool.get_handle(head) == pool.get_handle(tail)) { Chris@16: if (next_ptr == 0) Chris@16: return false; Chris@16: Chris@16: tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag()); Chris@16: tail_.compare_exchange_strong(tail, new_tail); Chris@16: Chris@16: } else { Chris@16: if (next_ptr == 0) Chris@16: /* this check is not part of the original algorithm as published by michael and scott Chris@16: * Chris@16: * however we reuse the tagged_ptr part for the freelist and clear the next part during node Chris@16: * allocation. we can observe a null-pointer here. Chris@16: * */ Chris@16: continue; Chris@16: detail::copy_payload(next_ptr->data, ret); Chris@16: Chris@16: tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag()); Chris@16: if (head_.compare_exchange_weak(head, new_head)) { Chris@16: pool.template destruct(head); Chris@16: return true; Chris@16: } Chris@16: } Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: /** Pops object from queue. Chris@16: * Chris@16: * \post if pop operation is successful, object will be copied to ret. Chris@16: * \returns true, if the pop operation is successful, false if queue was empty. Chris@16: * Chris@16: * \note Not thread-safe, but non-blocking Chris@16: * Chris@16: * */ Chris@16: bool unsynchronized_pop (T & ret) Chris@16: { Chris@16: return unsynchronized_pop(ret); Chris@16: } Chris@16: Chris@16: /** Pops object from queue. Chris@16: * Chris@16: * \pre type U must be constructible by T and copyable, or T must be convertible to U Chris@16: * \post if pop operation is successful, object will be copied to ret. Chris@16: * \returns true, if the pop operation is successful, false if queue was empty. Chris@16: * Chris@16: * \note Not thread-safe, but non-blocking Chris@16: * Chris@16: * */ Chris@16: template Chris@16: bool unsynchronized_pop (U & ret) Chris@16: { Chris@16: for (;;) { Chris@16: tagged_node_handle head = head_.load(memory_order_relaxed); Chris@16: node * head_ptr = pool.get_pointer(head); Chris@16: tagged_node_handle tail = tail_.load(memory_order_relaxed); Chris@16: tagged_node_handle next = head_ptr->next.load(memory_order_relaxed); Chris@16: node * next_ptr = pool.get_pointer(next); Chris@16: Chris@16: if (pool.get_handle(head) == pool.get_handle(tail)) { Chris@16: if (next_ptr == 0) Chris@16: return false; Chris@16: Chris@16: tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag()); Chris@16: tail_.store(new_tail); Chris@16: } else { Chris@16: if (next_ptr == 0) Chris@16: /* this check is not part of the original algorithm as published by michael and scott Chris@16: * Chris@16: * however we reuse the tagged_ptr part for the freelist and clear the next part during node Chris@16: * allocation. we can observe a null-pointer here. Chris@16: * */ Chris@16: continue; Chris@16: detail::copy_payload(next_ptr->data, ret); Chris@16: tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag()); Chris@16: head_.store(new_head); Chris@16: pool.template destruct(head); Chris@16: return true; Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: /** consumes one element via a functor Chris@16: * Chris@16: * pops one element from the queue and applies the functor on this object Chris@16: * Chris@16: * \returns true, if one element was consumed Chris@16: * Chris@16: * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking Chris@16: * */ Chris@16: template Chris@16: bool consume_one(Functor & f) Chris@16: { Chris@16: T element; Chris@16: bool success = pop(element); Chris@16: if (success) Chris@16: f(element); Chris@16: Chris@16: return success; Chris@16: } Chris@16: Chris@16: /// \copydoc boost::lockfree::queue::consume_one(Functor & rhs) Chris@16: template Chris@16: bool consume_one(Functor const & f) Chris@16: { Chris@16: T element; Chris@16: bool success = pop(element); Chris@16: if (success) Chris@16: f(element); Chris@16: Chris@16: return success; Chris@16: } Chris@16: Chris@16: /** consumes all elements via a functor Chris@16: * Chris@16: * sequentially pops all elements from the queue and applies the functor on each object Chris@16: * Chris@16: * \returns number of elements that are consumed Chris@16: * Chris@16: * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking Chris@16: * */ Chris@16: template Chris@16: size_t consume_all(Functor & f) Chris@16: { Chris@16: size_t element_count = 0; Chris@16: while (consume_one(f)) Chris@16: element_count += 1; Chris@16: Chris@16: return element_count; Chris@16: } Chris@16: Chris@16: /// \copydoc boost::lockfree::queue::consume_all(Functor & rhs) Chris@16: template Chris@16: size_t consume_all(Functor const & f) Chris@16: { Chris@16: size_t element_count = 0; Chris@16: while (consume_one(f)) Chris@16: element_count += 1; Chris@16: Chris@16: return element_count; Chris@16: } Chris@16: Chris@16: private: Chris@16: #ifndef BOOST_DOXYGEN_INVOKED Chris@16: atomic head_; Chris@16: static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle); Chris@16: char padding1[padding_size]; Chris@16: atomic tail_; Chris@16: char padding2[padding_size]; Chris@16: Chris@16: pool_t pool; Chris@16: #endif Chris@16: }; Chris@16: Chris@16: } /* namespace lockfree */ Chris@16: } /* namespace boost */ Chris@16: Chris@16: #if defined(_MSC_VER) Chris@16: #pragma warning(pop) Chris@16: #endif Chris@16: Chris@16: #endif /* BOOST_LOCKFREE_FIFO_HPP_INCLUDED */