annotate DEPENDENCIES/generic/include/boost/lockfree/spsc_queue.hpp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 // lock-free single-producer/single-consumer ringbuffer
Chris@16 2 // this algorithm is implemented in various projects (linux kernel)
Chris@16 3 //
Chris@16 4 // Copyright (C) 2009-2013 Tim Blechmann
Chris@16 5 //
Chris@16 6 // Distributed under the Boost Software License, Version 1.0. (See
Chris@16 7 // accompanying file LICENSE_1_0.txt or copy at
Chris@16 8 // http://www.boost.org/LICENSE_1_0.txt)
Chris@16 9
Chris@16 10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
Chris@16 11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
Chris@16 12
Chris@16 13 #include <algorithm>
Chris@16 14 #include <memory>
Chris@16 15
Chris@16 16 #include <boost/aligned_storage.hpp>
Chris@16 17 #include <boost/assert.hpp>
Chris@16 18 #include <boost/static_assert.hpp>
Chris@16 19 #include <boost/utility.hpp>
Chris@101 20 #include <boost/utility/enable_if.hpp>
Chris@16 21
Chris@16 22 #include <boost/type_traits/has_trivial_destructor.hpp>
Chris@101 23 #include <boost/type_traits/is_convertible.hpp>
Chris@16 24
Chris@16 25 #include <boost/lockfree/detail/atomic.hpp>
Chris@16 26 #include <boost/lockfree/detail/branch_hints.hpp>
Chris@101 27 #include <boost/lockfree/detail/copy_payload.hpp>
Chris@16 28 #include <boost/lockfree/detail/parameter.hpp>
Chris@16 29 #include <boost/lockfree/detail/prefix.hpp>
Chris@16 30
Chris@101 31 #ifdef BOOST_HAS_PRAGMA_ONCE
Chris@101 32 #pragma once
Chris@101 33 #endif
Chris@16 34
Chris@16 35 namespace boost {
Chris@16 36 namespace lockfree {
Chris@16 37 namespace detail {
Chris@16 38
Chris@16 39 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
Chris@16 40 boost::parameter::optional<tag::allocator>
Chris@16 41 > ringbuffer_signature;
Chris@16 42
Chris@16 43 template <typename T>
Chris@16 44 class ringbuffer_base
Chris@16 45 {
Chris@16 46 #ifndef BOOST_DOXYGEN_INVOKED
Chris@101 47 protected:
Chris@16 48 typedef std::size_t size_t;
Chris@16 49 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
Chris@16 50 atomic<size_t> write_index_;
Chris@16 51 char padding1[padding_size]; /* force read_index and write_index to different cache lines */
Chris@16 52 atomic<size_t> read_index_;
Chris@16 53
Chris@101 54 BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
Chris@101 55 BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
Chris@16 56
Chris@16 57 protected:
Chris@16 58 ringbuffer_base(void):
Chris@16 59 write_index_(0), read_index_(0)
Chris@16 60 {}
Chris@16 61
Chris@16 62 static size_t next_index(size_t arg, size_t max_size)
Chris@16 63 {
Chris@16 64 size_t ret = arg + 1;
Chris@16 65 while (unlikely(ret >= max_size))
Chris@16 66 ret -= max_size;
Chris@16 67 return ret;
Chris@16 68 }
Chris@16 69
Chris@16 70 static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
Chris@16 71 {
Chris@16 72 if (write_index >= read_index)
Chris@16 73 return write_index - read_index;
Chris@16 74
Chris@101 75 const size_t ret = write_index + max_size - read_index;
Chris@16 76 return ret;
Chris@16 77 }
Chris@16 78
Chris@16 79 static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
Chris@16 80 {
Chris@16 81 size_t ret = read_index - write_index - 1;
Chris@16 82 if (write_index >= read_index)
Chris@16 83 ret += max_size;
Chris@16 84 return ret;
Chris@16 85 }
Chris@16 86
Chris@101 87 size_t read_available(size_t max_size) const
Chris@101 88 {
Chris@101 89 size_t write_index = write_index_.load(memory_order_relaxed);
Chris@101 90 const size_t read_index = read_index_.load(memory_order_relaxed);
Chris@101 91 return read_available(write_index, read_index, max_size);
Chris@101 92 }
Chris@101 93
Chris@101 94 size_t write_available(size_t max_size) const
Chris@101 95 {
Chris@101 96 size_t write_index = write_index_.load(memory_order_relaxed);
Chris@101 97 const size_t read_index = read_index_.load(memory_order_relaxed);
Chris@101 98 return write_available(write_index, read_index, max_size);
Chris@101 99 }
Chris@101 100
Chris@16 101 bool push(T const & t, T * buffer, size_t max_size)
Chris@16 102 {
Chris@16 103 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
Chris@16 104 const size_t next = next_index(write_index, max_size);
Chris@16 105
Chris@16 106 if (next == read_index_.load(memory_order_acquire))
Chris@16 107 return false; /* ringbuffer is full */
Chris@16 108
Chris@16 109 new (buffer + write_index) T(t); // copy-construct
Chris@16 110
Chris@16 111 write_index_.store(next, memory_order_release);
Chris@16 112
Chris@16 113 return true;
Chris@16 114 }
Chris@16 115
Chris@16 116 size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
Chris@16 117 {
Chris@16 118 return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
Chris@16 119 }
Chris@16 120
Chris@16 121 template <typename ConstIterator>
Chris@16 122 ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
Chris@16 123 {
Chris@16 124 // FIXME: avoid std::distance
Chris@16 125
Chris@16 126 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
Chris@16 127 const size_t read_index = read_index_.load(memory_order_acquire);
Chris@16 128 const size_t avail = write_available(write_index, read_index, max_size);
Chris@16 129
Chris@16 130 if (avail == 0)
Chris@16 131 return begin;
Chris@16 132
Chris@16 133 size_t input_count = std::distance(begin, end);
Chris@16 134 input_count = (std::min)(input_count, avail);
Chris@16 135
Chris@16 136 size_t new_write_index = write_index + input_count;
Chris@16 137
Chris@16 138 const ConstIterator last = boost::next(begin, input_count);
Chris@16 139
Chris@16 140 if (write_index + input_count > max_size) {
Chris@16 141 /* copy data in two sections */
Chris@16 142 const size_t count0 = max_size - write_index;
Chris@16 143 const ConstIterator midpoint = boost::next(begin, count0);
Chris@16 144
Chris@16 145 std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
Chris@16 146 std::uninitialized_copy(midpoint, last, internal_buffer);
Chris@16 147 new_write_index -= max_size;
Chris@16 148 } else {
Chris@16 149 std::uninitialized_copy(begin, last, internal_buffer + write_index);
Chris@16 150
Chris@16 151 if (new_write_index == max_size)
Chris@16 152 new_write_index = 0;
Chris@16 153 }
Chris@16 154
Chris@16 155 write_index_.store(new_write_index, memory_order_release);
Chris@16 156 return last;
Chris@16 157 }
Chris@16 158
Chris@101 159 template <typename Functor>
Chris@101 160 bool consume_one(Functor & functor, T * buffer, size_t max_size)
Chris@16 161 {
Chris@16 162 const size_t write_index = write_index_.load(memory_order_acquire);
Chris@16 163 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@101 164 if ( empty(write_index, read_index) )
Chris@16 165 return false;
Chris@16 166
Chris@101 167 T & object_to_consume = buffer[read_index];
Chris@101 168 functor( object_to_consume );
Chris@101 169 object_to_consume.~T();
Chris@16 170
Chris@16 171 size_t next = next_index(read_index, max_size);
Chris@16 172 read_index_.store(next, memory_order_release);
Chris@16 173 return true;
Chris@16 174 }
Chris@16 175
Chris@101 176 template <typename Functor>
Chris@101 177 bool consume_one(Functor const & functor, T * buffer, size_t max_size)
Chris@101 178 {
Chris@101 179 const size_t write_index = write_index_.load(memory_order_acquire);
Chris@101 180 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@101 181 if ( empty(write_index, read_index) )
Chris@101 182 return false;
Chris@101 183
Chris@101 184 T & object_to_consume = buffer[read_index];
Chris@101 185 functor( object_to_consume );
Chris@101 186 object_to_consume.~T();
Chris@101 187
Chris@101 188 size_t next = next_index(read_index, max_size);
Chris@101 189 read_index_.store(next, memory_order_release);
Chris@101 190 return true;
Chris@101 191 }
Chris@101 192
Chris@101 193 template <typename Functor>
Chris@101 194 size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
Chris@101 195 {
Chris@101 196 const size_t write_index = write_index_.load(memory_order_acquire);
Chris@101 197 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@101 198
Chris@101 199 const size_t avail = read_available(write_index, read_index, max_size);
Chris@101 200
Chris@101 201 if (avail == 0)
Chris@101 202 return 0;
Chris@101 203
Chris@101 204 const size_t output_count = avail;
Chris@101 205
Chris@101 206 size_t new_read_index = read_index + output_count;
Chris@101 207
Chris@101 208 if (read_index + output_count > max_size) {
Chris@101 209 /* copy data in two sections */
Chris@101 210 const size_t count0 = max_size - read_index;
Chris@101 211 const size_t count1 = output_count - count0;
Chris@101 212
Chris@101 213 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
Chris@101 214 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
Chris@101 215
Chris@101 216 new_read_index -= max_size;
Chris@101 217 } else {
Chris@101 218 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
Chris@101 219
Chris@101 220 if (new_read_index == max_size)
Chris@101 221 new_read_index = 0;
Chris@101 222 }
Chris@101 223
Chris@101 224 read_index_.store(new_read_index, memory_order_release);
Chris@101 225 return output_count;
Chris@101 226 }
Chris@101 227
Chris@101 228 template <typename Functor>
Chris@101 229 size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
Chris@101 230 {
Chris@101 231 const size_t write_index = write_index_.load(memory_order_acquire);
Chris@101 232 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@101 233
Chris@101 234 const size_t avail = read_available(write_index, read_index, max_size);
Chris@101 235
Chris@101 236 if (avail == 0)
Chris@101 237 return 0;
Chris@101 238
Chris@101 239 const size_t output_count = avail;
Chris@101 240
Chris@101 241 size_t new_read_index = read_index + output_count;
Chris@101 242
Chris@101 243 if (read_index + output_count > max_size) {
Chris@101 244 /* copy data in two sections */
Chris@101 245 const size_t count0 = max_size - read_index;
Chris@101 246 const size_t count1 = output_count - count0;
Chris@101 247
Chris@101 248 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
Chris@101 249 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
Chris@101 250
Chris@101 251 new_read_index -= max_size;
Chris@101 252 } else {
Chris@101 253 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
Chris@101 254
Chris@101 255 if (new_read_index == max_size)
Chris@101 256 new_read_index = 0;
Chris@101 257 }
Chris@101 258
Chris@101 259 read_index_.store(new_read_index, memory_order_release);
Chris@101 260 return output_count;
Chris@101 261 }
Chris@101 262
Chris@16 263 size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
Chris@16 264 {
Chris@16 265 const size_t write_index = write_index_.load(memory_order_acquire);
Chris@16 266 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@16 267
Chris@16 268 const size_t avail = read_available(write_index, read_index, max_size);
Chris@16 269
Chris@16 270 if (avail == 0)
Chris@16 271 return 0;
Chris@16 272
Chris@16 273 output_count = (std::min)(output_count, avail);
Chris@16 274
Chris@16 275 size_t new_read_index = read_index + output_count;
Chris@16 276
Chris@16 277 if (read_index + output_count > max_size) {
Chris@16 278 /* copy data in two sections */
Chris@16 279 const size_t count0 = max_size - read_index;
Chris@16 280 const size_t count1 = output_count - count0;
Chris@16 281
Chris@16 282 copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
Chris@16 283 copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
Chris@16 284
Chris@16 285 new_read_index -= max_size;
Chris@16 286 } else {
Chris@16 287 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
Chris@16 288 if (new_read_index == max_size)
Chris@16 289 new_read_index = 0;
Chris@16 290 }
Chris@16 291
Chris@16 292 read_index_.store(new_read_index, memory_order_release);
Chris@16 293 return output_count;
Chris@16 294 }
Chris@16 295
Chris@16 296 template <typename OutputIterator>
Chris@101 297 size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
Chris@16 298 {
Chris@16 299 const size_t write_index = write_index_.load(memory_order_acquire);
Chris@16 300 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@16 301
Chris@16 302 const size_t avail = read_available(write_index, read_index, max_size);
Chris@16 303 if (avail == 0)
Chris@16 304 return 0;
Chris@16 305
Chris@16 306 size_t new_read_index = read_index + avail;
Chris@16 307
Chris@16 308 if (read_index + avail > max_size) {
Chris@16 309 /* copy data in two sections */
Chris@16 310 const size_t count0 = max_size - read_index;
Chris@16 311 const size_t count1 = avail - count0;
Chris@16 312
Chris@16 313 it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
Chris@16 314 copy_and_delete(internal_buffer, internal_buffer + count1, it);
Chris@16 315
Chris@16 316 new_read_index -= max_size;
Chris@16 317 } else {
Chris@16 318 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
Chris@16 319 if (new_read_index == max_size)
Chris@16 320 new_read_index = 0;
Chris@16 321 }
Chris@16 322
Chris@16 323 read_index_.store(new_read_index, memory_order_release);
Chris@16 324 return avail;
Chris@16 325 }
Chris@101 326
Chris@101 327 const T& front(const T * internal_buffer) const
Chris@101 328 {
Chris@101 329 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@101 330 return *(internal_buffer + read_index);
Chris@101 331 }
Chris@101 332
Chris@101 333 T& front(T * internal_buffer)
Chris@101 334 {
Chris@101 335 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
Chris@101 336 return *(internal_buffer + read_index);
Chris@101 337 }
Chris@16 338 #endif
Chris@16 339
Chris@16 340
Chris@16 341 public:
Chris@16 342 /** reset the ringbuffer
Chris@16 343 *
Chris@16 344 * \note Not thread-safe
Chris@16 345 * */
Chris@16 346 void reset(void)
Chris@16 347 {
Chris@101 348 if ( !boost::has_trivial_destructor<T>::value ) {
Chris@101 349 // make sure to call all destructors!
Chris@101 350
Chris@101 351 T dummy_element;
Chris@101 352 while (pop(dummy_element))
Chris@101 353 {}
Chris@101 354 } else {
Chris@101 355 write_index_.store(0, memory_order_relaxed);
Chris@101 356 read_index_.store(0, memory_order_release);
Chris@101 357 }
Chris@16 358 }
Chris@16 359
Chris@16 360 /** Check if the ringbuffer is empty
Chris@16 361 *
Chris@16 362 * \return true, if the ringbuffer is empty, false otherwise
Chris@16 363 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
Chris@16 364 * */
Chris@16 365 bool empty(void)
Chris@16 366 {
Chris@16 367 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
Chris@16 368 }
Chris@16 369
Chris@16 370 /**
Chris@16 371 * \return true, if implementation is lock-free.
Chris@16 372 *
Chris@16 373 * */
Chris@16 374 bool is_lock_free(void) const
Chris@16 375 {
Chris@16 376 return write_index_.is_lock_free() && read_index_.is_lock_free();
Chris@16 377 }
Chris@16 378
Chris@16 379 private:
Chris@16 380 bool empty(size_t write_index, size_t read_index)
Chris@16 381 {
Chris@16 382 return write_index == read_index;
Chris@16 383 }
Chris@16 384
Chris@16 385 template< class OutputIterator >
Chris@16 386 OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
Chris@16 387 {
Chris@16 388 if (boost::has_trivial_destructor<T>::value) {
Chris@16 389 return std::copy(first, last, out); // will use memcpy if possible
Chris@16 390 } else {
Chris@16 391 for (; first != last; ++first, ++out) {
Chris@16 392 *out = *first;
Chris@16 393 first->~T();
Chris@16 394 }
Chris@16 395 return out;
Chris@16 396 }
Chris@16 397 }
Chris@101 398
Chris@101 399 template< class Functor >
Chris@101 400 void run_functor_and_delete( T * first, T * last, Functor & functor )
Chris@101 401 {
Chris@101 402 for (; first != last; ++first) {
Chris@101 403 functor(*first);
Chris@101 404 first->~T();
Chris@101 405 }
Chris@101 406 }
Chris@101 407
Chris@101 408 template< class Functor >
Chris@101 409 void run_functor_and_delete( T * first, T * last, Functor const & functor )
Chris@101 410 {
Chris@101 411 for (; first != last; ++first) {
Chris@101 412 functor(*first);
Chris@101 413 first->~T();
Chris@101 414 }
Chris@101 415 }
Chris@16 416 };
Chris@16 417
Chris@16 418 template <typename T, std::size_t MaxSize>
Chris@16 419 class compile_time_sized_ringbuffer:
Chris@16 420 public ringbuffer_base<T>
Chris@16 421 {
Chris@16 422 typedef std::size_t size_type;
Chris@16 423 static const std::size_t max_size = MaxSize + 1;
Chris@16 424
Chris@16 425 typedef typename boost::aligned_storage<max_size * sizeof(T),
Chris@16 426 boost::alignment_of<T>::value
Chris@16 427 >::type storage_type;
Chris@16 428
Chris@16 429 storage_type storage_;
Chris@16 430
Chris@16 431 T * data()
Chris@16 432 {
Chris@16 433 return static_cast<T*>(storage_.address());
Chris@16 434 }
Chris@16 435
Chris@101 436 const T * data() const
Chris@101 437 {
Chris@101 438 return static_cast<const T*>(storage_.address());
Chris@101 439 }
Chris@101 440
Chris@101 441 protected:
Chris@101 442 size_type max_number_of_elements() const
Chris@101 443 {
Chris@101 444 return max_size;
Chris@101 445 }
Chris@101 446
Chris@16 447 public:
Chris@16 448 bool push(T const & t)
Chris@16 449 {
Chris@16 450 return ringbuffer_base<T>::push(t, data(), max_size);
Chris@16 451 }
Chris@16 452
Chris@101 453 template <typename Functor>
Chris@101 454 bool consume_one(Functor & f)
Chris@16 455 {
Chris@101 456 return ringbuffer_base<T>::consume_one(f, data(), max_size);
Chris@101 457 }
Chris@101 458
Chris@101 459 template <typename Functor>
Chris@101 460 bool consume_one(Functor const & f)
Chris@101 461 {
Chris@101 462 return ringbuffer_base<T>::consume_one(f, data(), max_size);
Chris@101 463 }
Chris@101 464
Chris@101 465 template <typename Functor>
Chris@101 466 size_type consume_all(Functor & f)
Chris@101 467 {
Chris@101 468 return ringbuffer_base<T>::consume_all(f, data(), max_size);
Chris@101 469 }
Chris@101 470
Chris@101 471 template <typename Functor>
Chris@101 472 size_type consume_all(Functor const & f)
Chris@101 473 {
Chris@101 474 return ringbuffer_base<T>::consume_all(f, data(), max_size);
Chris@16 475 }
Chris@16 476
Chris@16 477 size_type push(T const * t, size_type size)
Chris@16 478 {
Chris@16 479 return ringbuffer_base<T>::push(t, size, data(), max_size);
Chris@16 480 }
Chris@16 481
Chris@16 482 template <size_type size>
Chris@16 483 size_type push(T const (&t)[size])
Chris@16 484 {
Chris@16 485 return push(t, size);
Chris@16 486 }
Chris@16 487
Chris@16 488 template <typename ConstIterator>
Chris@16 489 ConstIterator push(ConstIterator begin, ConstIterator end)
Chris@16 490 {
Chris@16 491 return ringbuffer_base<T>::push(begin, end, data(), max_size);
Chris@16 492 }
Chris@16 493
Chris@16 494 size_type pop(T * ret, size_type size)
Chris@16 495 {
Chris@16 496 return ringbuffer_base<T>::pop(ret, size, data(), max_size);
Chris@16 497 }
Chris@16 498
Chris@101 499 template <typename OutputIterator>
Chris@101 500 size_type pop_to_output_iterator(OutputIterator it)
Chris@16 501 {
Chris@101 502 return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
Chris@16 503 }
Chris@16 504
Chris@101 505 const T& front(void) const
Chris@16 506 {
Chris@101 507 return ringbuffer_base<T>::front(data());
Chris@101 508 }
Chris@101 509
Chris@101 510 T& front(void)
Chris@101 511 {
Chris@101 512 return ringbuffer_base<T>::front(data());
Chris@16 513 }
Chris@16 514 };
Chris@16 515
Chris@16 516 template <typename T, typename Alloc>
Chris@16 517 class runtime_sized_ringbuffer:
Chris@16 518 public ringbuffer_base<T>,
Chris@16 519 private Alloc
Chris@16 520 {
Chris@16 521 typedef std::size_t size_type;
Chris@16 522 size_type max_elements_;
Chris@16 523 typedef typename Alloc::pointer pointer;
Chris@16 524 pointer array_;
Chris@16 525
Chris@101 526 protected:
Chris@101 527 size_type max_number_of_elements() const
Chris@101 528 {
Chris@101 529 return max_elements_;
Chris@101 530 }
Chris@101 531
Chris@16 532 public:
Chris@16 533 explicit runtime_sized_ringbuffer(size_type max_elements):
Chris@16 534 max_elements_(max_elements + 1)
Chris@16 535 {
Chris@16 536 array_ = Alloc::allocate(max_elements_);
Chris@16 537 }
Chris@16 538
Chris@16 539 template <typename U>
Chris@16 540 runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements):
Chris@16 541 Alloc(alloc), max_elements_(max_elements + 1)
Chris@16 542 {
Chris@16 543 array_ = Alloc::allocate(max_elements_);
Chris@16 544 }
Chris@16 545
Chris@16 546 runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
Chris@16 547 Alloc(alloc), max_elements_(max_elements + 1)
Chris@16 548 {
Chris@16 549 array_ = Alloc::allocate(max_elements_);
Chris@16 550 }
Chris@16 551
Chris@16 552 ~runtime_sized_ringbuffer(void)
Chris@16 553 {
Chris@16 554 // destroy all remaining items
Chris@16 555 T out;
Chris@101 556 while (pop(&out, 1)) {}
Chris@16 557
Chris@16 558 Alloc::deallocate(array_, max_elements_);
Chris@16 559 }
Chris@16 560
Chris@16 561 bool push(T const & t)
Chris@16 562 {
Chris@16 563 return ringbuffer_base<T>::push(t, &*array_, max_elements_);
Chris@16 564 }
Chris@16 565
Chris@101 566 template <typename Functor>
Chris@101 567 bool consume_one(Functor & f)
Chris@16 568 {
Chris@101 569 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
Chris@101 570 }
Chris@101 571
Chris@101 572 template <typename Functor>
Chris@101 573 bool consume_one(Functor const & f)
Chris@101 574 {
Chris@101 575 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
Chris@101 576 }
Chris@101 577
Chris@101 578 template <typename Functor>
Chris@101 579 size_type consume_all(Functor & f)
Chris@101 580 {
Chris@101 581 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
Chris@101 582 }
Chris@101 583
Chris@101 584 template <typename Functor>
Chris@101 585 size_type consume_all(Functor const & f)
Chris@101 586 {
Chris@101 587 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
Chris@16 588 }
Chris@16 589
Chris@16 590 size_type push(T const * t, size_type size)
Chris@16 591 {
Chris@16 592 return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
Chris@16 593 }
Chris@16 594
Chris@16 595 template <size_type size>
Chris@16 596 size_type push(T const (&t)[size])
Chris@16 597 {
Chris@16 598 return push(t, size);
Chris@16 599 }
Chris@16 600
Chris@16 601 template <typename ConstIterator>
Chris@16 602 ConstIterator push(ConstIterator begin, ConstIterator end)
Chris@16 603 {
Chris@16 604 return ringbuffer_base<T>::push(begin, end, array_, max_elements_);
Chris@16 605 }
Chris@16 606
Chris@16 607 size_type pop(T * ret, size_type size)
Chris@16 608 {
Chris@16 609 return ringbuffer_base<T>::pop(ret, size, array_, max_elements_);
Chris@16 610 }
Chris@16 611
Chris@101 612 template <typename OutputIterator>
Chris@101 613 size_type pop_to_output_iterator(OutputIterator it)
Chris@16 614 {
Chris@101 615 return ringbuffer_base<T>::pop_to_output_iterator(it, array_, max_elements_);
Chris@16 616 }
Chris@16 617
Chris@101 618 const T& front(void) const
Chris@16 619 {
Chris@101 620 return ringbuffer_base<T>::front(array_);
Chris@101 621 }
Chris@101 622
Chris@101 623 T& front(void)
Chris@101 624 {
Chris@101 625 return ringbuffer_base<T>::front(array_);
Chris@16 626 }
Chris@16 627 };
Chris@16 628
Chris@16 629 template <typename T, typename A0, typename A1>
Chris@16 630 struct make_ringbuffer
Chris@16 631 {
Chris@16 632 typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
Chris@16 633
Chris@16 634 typedef extract_capacity<bound_args> extract_capacity_t;
Chris@16 635
Chris@16 636 static const bool runtime_sized = !extract_capacity_t::has_capacity;
Chris@16 637 static const size_t capacity = extract_capacity_t::capacity;
Chris@16 638
Chris@16 639 typedef extract_allocator<bound_args, T> extract_allocator_t;
Chris@16 640 typedef typename extract_allocator_t::type allocator;
Chris@16 641
Chris@16 642 // allocator argument is only sane, for run-time sized ringbuffers
Chris@16 643 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
Chris@16 644 mpl::bool_<!extract_allocator_t::has_allocator>,
Chris@16 645 mpl::true_
Chris@16 646 >::type::value));
Chris@16 647
Chris@16 648 typedef typename mpl::if_c<runtime_sized,
Chris@16 649 runtime_sized_ringbuffer<T, allocator>,
Chris@16 650 compile_time_sized_ringbuffer<T, capacity>
Chris@16 651 >::type ringbuffer_type;
Chris@16 652 };
Chris@16 653
Chris@16 654
Chris@16 655 } /* namespace detail */
Chris@16 656
Chris@16 657
Chris@16 658 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
Chris@16 659 *
Chris@16 660 * \b Policies:
Chris@16 661 * - \c boost::lockfree::capacity<>, optional <br>
Chris@16 662 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
Chris@16 663 *
Chris@16 664 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
Chris@16 665 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
Chris@16 666 * to be sized at run-time
Chris@16 667 *
Chris@16 668 * \b Requirements:
Chris@16 669 * - T must have a default constructor
Chris@16 670 * - T must be copyable
Chris@16 671 * */
Chris@16 672 #ifndef BOOST_DOXYGEN_INVOKED
Chris@16 673 template <typename T,
Chris@16 674 class A0 = boost::parameter::void_,
Chris@16 675 class A1 = boost::parameter::void_>
Chris@16 676 #else
Chris@16 677 template <typename T, ...Options>
Chris@16 678 #endif
Chris@16 679 class spsc_queue:
Chris@16 680 public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
Chris@16 681 {
Chris@16 682 private:
Chris@16 683
Chris@16 684 #ifndef BOOST_DOXYGEN_INVOKED
Chris@16 685 typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
Chris@16 686 static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
Chris@16 687 typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
Chris@16 688
Chris@16 689 struct implementation_defined
Chris@16 690 {
Chris@16 691 typedef allocator_arg allocator;
Chris@16 692 typedef std::size_t size_type;
Chris@16 693 };
Chris@16 694 #endif
Chris@16 695
Chris@16 696 public:
Chris@16 697 typedef T value_type;
Chris@16 698 typedef typename implementation_defined::allocator allocator;
Chris@16 699 typedef typename implementation_defined::size_type size_type;
Chris@16 700
Chris@16 701 /** Constructs a spsc_queue
Chris@16 702 *
Chris@16 703 * \pre spsc_queue must be configured to be sized at compile-time
Chris@16 704 */
Chris@16 705 // @{
Chris@16 706 spsc_queue(void)
Chris@16 707 {
Chris@16 708 BOOST_ASSERT(!runtime_sized);
Chris@16 709 }
Chris@16 710
Chris@16 711 template <typename U>
Chris@16 712 explicit spsc_queue(typename allocator::template rebind<U>::other const & alloc)
Chris@16 713 {
Chris@16 714 // just for API compatibility: we don't actually need an allocator
Chris@16 715 BOOST_STATIC_ASSERT(!runtime_sized);
Chris@16 716 }
Chris@16 717
Chris@16 718 explicit spsc_queue(allocator const & alloc)
Chris@16 719 {
Chris@16 720 // just for API compatibility: we don't actually need an allocator
Chris@16 721 BOOST_ASSERT(!runtime_sized);
Chris@16 722 }
Chris@16 723 // @}
Chris@16 724
Chris@16 725
Chris@16 726 /** Constructs a spsc_queue for element_count elements
Chris@16 727 *
Chris@16 728 * \pre spsc_queue must be configured to be sized at run-time
Chris@16 729 */
Chris@16 730 // @{
Chris@16 731 explicit spsc_queue(size_type element_count):
Chris@16 732 base_type(element_count)
Chris@16 733 {
Chris@16 734 BOOST_ASSERT(runtime_sized);
Chris@16 735 }
Chris@16 736
Chris@16 737 template <typename U>
Chris@16 738 spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
Chris@16 739 base_type(alloc, element_count)
Chris@16 740 {
Chris@16 741 BOOST_STATIC_ASSERT(runtime_sized);
Chris@16 742 }
Chris@16 743
Chris@16 744 spsc_queue(size_type element_count, allocator_arg const & alloc):
Chris@16 745 base_type(alloc, element_count)
Chris@16 746 {
Chris@16 747 BOOST_ASSERT(runtime_sized);
Chris@16 748 }
Chris@16 749 // @}
Chris@16 750
Chris@16 751 /** Pushes object t to the ringbuffer.
Chris@16 752 *
Chris@16 753 * \pre only one thread is allowed to push data to the spsc_queue
Chris@16 754 * \post object will be pushed to the spsc_queue, unless it is full.
Chris@16 755 * \return true, if the push operation is successful.
Chris@16 756 *
Chris@16 757 * \note Thread-safe and wait-free
Chris@16 758 * */
Chris@16 759 bool push(T const & t)
Chris@16 760 {
Chris@16 761 return base_type::push(t);
Chris@16 762 }
Chris@16 763
Chris@16 764 /** Pops one object from ringbuffer.
Chris@16 765 *
Chris@16 766 * \pre only one thread is allowed to pop data to the spsc_queue
Chris@101 767 * \post if ringbuffer is not empty, object will be discarded.
Chris@101 768 * \return true, if the pop operation is successful, false if ringbuffer was empty.
Chris@101 769 *
Chris@101 770 * \note Thread-safe and wait-free
Chris@101 771 */
Chris@101 772 bool pop ()
Chris@101 773 {
Chris@101 774 detail::consume_noop consume_functor;
Chris@101 775 return consume_one( consume_functor );
Chris@101 776 }
Chris@101 777
Chris@101 778 /** Pops one object from ringbuffer.
Chris@101 779 *
Chris@101 780 * \pre only one thread is allowed to pop data to the spsc_queue
Chris@16 781 * \post if ringbuffer is not empty, object will be copied to ret.
Chris@16 782 * \return true, if the pop operation is successful, false if ringbuffer was empty.
Chris@16 783 *
Chris@16 784 * \note Thread-safe and wait-free
Chris@16 785 */
Chris@101 786 template <typename U>
Chris@101 787 typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
Chris@101 788 pop (U & ret)
Chris@16 789 {
Chris@101 790 detail::consume_via_copy<U> consume_functor(ret);
Chris@101 791 return consume_one( consume_functor );
Chris@16 792 }
Chris@16 793
Chris@16 794 /** Pushes as many objects from the array t as there is space.
Chris@16 795 *
Chris@16 796 * \pre only one thread is allowed to push data to the spsc_queue
Chris@16 797 * \return number of pushed items
Chris@16 798 *
Chris@16 799 * \note Thread-safe and wait-free
Chris@16 800 */
Chris@16 801 size_type push(T const * t, size_type size)
Chris@16 802 {
Chris@16 803 return base_type::push(t, size);
Chris@16 804 }
Chris@16 805
Chris@16 806 /** Pushes as many objects from the array t as there is space available.
Chris@16 807 *
Chris@16 808 * \pre only one thread is allowed to push data to the spsc_queue
Chris@16 809 * \return number of pushed items
Chris@16 810 *
Chris@16 811 * \note Thread-safe and wait-free
Chris@16 812 */
Chris@16 813 template <size_type size>
Chris@16 814 size_type push(T const (&t)[size])
Chris@16 815 {
Chris@16 816 return push(t, size);
Chris@16 817 }
Chris@16 818
Chris@16 819 /** Pushes as many objects from the range [begin, end) as there is space .
Chris@16 820 *
Chris@16 821 * \pre only one thread is allowed to push data to the spsc_queue
Chris@16 822 * \return iterator to the first element, which has not been pushed
Chris@16 823 *
Chris@16 824 * \note Thread-safe and wait-free
Chris@16 825 */
Chris@16 826 template <typename ConstIterator>
Chris@16 827 ConstIterator push(ConstIterator begin, ConstIterator end)
Chris@16 828 {
Chris@16 829 return base_type::push(begin, end);
Chris@16 830 }
Chris@16 831
Chris@16 832 /** Pops a maximum of size objects from ringbuffer.
Chris@16 833 *
Chris@16 834 * \pre only one thread is allowed to pop data to the spsc_queue
Chris@16 835 * \return number of popped items
Chris@16 836 *
Chris@16 837 * \note Thread-safe and wait-free
Chris@16 838 * */
Chris@16 839 size_type pop(T * ret, size_type size)
Chris@16 840 {
Chris@16 841 return base_type::pop(ret, size);
Chris@16 842 }
Chris@16 843
Chris@16 844 /** Pops a maximum of size objects from spsc_queue.
Chris@16 845 *
Chris@16 846 * \pre only one thread is allowed to pop data to the spsc_queue
Chris@16 847 * \return number of popped items
Chris@16 848 *
Chris@16 849 * \note Thread-safe and wait-free
Chris@16 850 * */
Chris@16 851 template <size_type size>
Chris@16 852 size_type pop(T (&ret)[size])
Chris@16 853 {
Chris@16 854 return pop(ret, size);
Chris@16 855 }
Chris@16 856
Chris@16 857 /** Pops objects to the output iterator it
Chris@16 858 *
Chris@16 859 * \pre only one thread is allowed to pop data to the spsc_queue
Chris@16 860 * \return number of popped items
Chris@16 861 *
Chris@16 862 * \note Thread-safe and wait-free
Chris@16 863 * */
Chris@16 864 template <typename OutputIterator>
Chris@101 865 typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
Chris@101 866 pop(OutputIterator it)
Chris@16 867 {
Chris@101 868 return base_type::pop_to_output_iterator(it);
Chris@16 869 }
Chris@16 870
Chris@16 871 /** consumes one element via a functor
Chris@16 872 *
Chris@16 873 * pops one element from the queue and applies the functor on this object
Chris@16 874 *
Chris@16 875 * \returns true, if one element was consumed
Chris@16 876 *
Chris@16 877 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
Chris@16 878 * */
Chris@16 879 template <typename Functor>
Chris@16 880 bool consume_one(Functor & f)
Chris@16 881 {
Chris@101 882 return base_type::consume_one(f);
Chris@16 883 }
Chris@16 884
Chris@16 885 /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
Chris@16 886 template <typename Functor>
Chris@16 887 bool consume_one(Functor const & f)
Chris@16 888 {
Chris@101 889 return base_type::consume_one(f);
Chris@16 890 }
Chris@16 891
Chris@16 892 /** consumes all elements via a functor
Chris@16 893 *
Chris@16 894 * sequentially pops all elements from the queue and applies the functor on each object
Chris@16 895 *
Chris@16 896 * \returns number of elements that are consumed
Chris@16 897 *
Chris@16 898 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
Chris@16 899 * */
Chris@16 900 template <typename Functor>
Chris@16 901 size_type consume_all(Functor & f)
Chris@16 902 {
Chris@101 903 return base_type::consume_all(f);
Chris@16 904 }
Chris@16 905
Chris@16 906 /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
Chris@16 907 template <typename Functor>
Chris@16 908 size_type consume_all(Functor const & f)
Chris@16 909 {
Chris@101 910 return base_type::consume_all(f);
Chris@101 911 }
Chris@16 912
Chris@101 913 /** get number of elements that are available for read
Chris@101 914 *
Chris@101 915 * \return number of available elements that can be popped from the spsc_queue
Chris@101 916 *
Chris@101 917 * \note Thread-safe and wait-free, should only be called from the producer thread
Chris@101 918 * */
Chris@101 919 size_type read_available() const
Chris@101 920 {
Chris@101 921 return base_type::read_available(base_type::max_number_of_elements());
Chris@16 922 }
Chris@101 923
Chris@101 924 /** get write space to write elements
Chris@101 925 *
Chris@101 926 * \return number of elements that can be pushed to the spsc_queue
Chris@101 927 *
Chris@101 928 * \note Thread-safe and wait-free, should only be called from the consumer thread
Chris@101 929 * */
Chris@101 930 size_type write_available() const
Chris@101 931 {
Chris@101 932 return base_type::write_available(base_type::max_number_of_elements());
Chris@101 933 }
Chris@101 934
Chris@101 935 /** get reference to element in the front of the queue
Chris@101 936 *
Chris@101 937 * Availability of front element can be checked using read_available().
Chris@101 938 *
Chris@101 939 * \pre only one thread is allowed to check front element
Chris@101 940 * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
Chris@101 941 * \return reference to the first element in the queue
Chris@101 942 *
Chris@101 943 * \note Thread-safe and wait-free
Chris@101 944 */
Chris@101 945 const T& front() const
Chris@101 946 {
Chris@101 947 BOOST_ASSERT(read_available() > 0);
Chris@101 948 return base_type::front();
Chris@101 949 }
Chris@101 950
Chris@101 951 /// \copydoc boost::lockfree::spsc_queue::front() const
Chris@101 952 T& front()
Chris@101 953 {
Chris@101 954 BOOST_ASSERT(read_available() > 0);
Chris@101 955 return base_type::front();
Chris@101 956 }
Chris@101 957
Chris@101 958 /** reset the ringbuffer
Chris@101 959 *
Chris@101 960 * \note Not thread-safe
Chris@101 961 * */
Chris@101 962 void reset(void)
Chris@101 963 {
Chris@101 964 if ( !boost::has_trivial_destructor<T>::value ) {
Chris@101 965 // make sure to call all destructors!
Chris@101 966
Chris@101 967 T dummy_element;
Chris@101 968 while (pop(dummy_element))
Chris@101 969 {}
Chris@101 970 } else {
Chris@101 971 base_type::write_index_.store(0, memory_order_relaxed);
Chris@101 972 base_type::read_index_.store(0, memory_order_release);
Chris@101 973 }
Chris@101 974 }
Chris@16 975 };
Chris@16 976
Chris@16 977 } /* namespace lockfree */
Chris@16 978 } /* namespace boost */
Chris@16 979
Chris@16 980
Chris@16 981 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */