Mercurial > hg > svcore
comparison base/RingBuffer.h @ 835:1d439494604c
Memory barriers in ringbuffer
author | Chris Cannam |
---|---|
date | Mon, 16 Sep 2013 15:47:27 +0100 |
parents | 2ba202c5be8d |
children | 6a94bb528e9d |
comparison
equal
deleted
inserted
replaced
834:fcee7e040ab4 | 835:1d439494604c |
---|---|
22 #define _RINGBUFFER_H_ | 22 #define _RINGBUFFER_H_ |
23 | 23 |
24 #include <sys/types.h> | 24 #include <sys/types.h> |
25 | 25 |
26 #include "system/System.h" | 26 #include "system/System.h" |
27 #include "Scavenger.h" | |
28 | 27 |
29 #include <cstring> // memcpy, memset &c | 28 #include <cstring> // memcpy, memset &c |
30 | 29 |
31 //#define DEBUG_RINGBUFFER 1 | 30 //#define DEBUG_RINGBUFFER 1 |
32 | 31 |
65 * (This is the argument n passed to the constructor.) | 64 * (This is the argument n passed to the constructor.) |
66 */ | 65 */ |
67 size_t getSize() const; | 66 size_t getSize() const; |
68 | 67 |
69 /** | 68 /** |
70 * Resize the ring buffer. This also empties it. Actually swaps | 69 * Return a new ring buffer (allocated with "new" -- caller must |
71 * in a new, larger buffer; the old buffer is scavenged after a | 70 * delete when no longer needed) of the given size, containing the |
72 * seemly delay. Should be called from the write thread. | 71 * same data as this one as perceived by reader 0 of this buffer. |
73 */ | 72 * If another thread reads from or writes to this buffer during |
74 void resize(size_t newSize); | 73 * the call, the contents of the new buffer may be incomplete or |
74 * inconsistent. If this buffer's data will not fit in the new | |
75 * size, the contents are undefined. | |
76 */ | |
77 RingBuffer<T, N> *resized(size_t newSize) const; | |
75 | 78 |
76 /** | 79 /** |
77 * Lock the ring buffer into physical memory. Returns true | 80 * Lock the ring buffer into physical memory. Returns true |
78 * for success. | 81 * for success. |
79 */ | 82 */ |
165 size_t m_writer; | 168 size_t m_writer; |
166 size_t *m_readers; | 169 size_t *m_readers; |
167 size_t m_size; | 170 size_t m_size; |
168 size_t m_spare; | 171 size_t m_spare; |
169 | 172 |
170 static Scavenger<ScavengerArrayWrapper<T> > m_scavenger; | |
171 | |
172 private: | 173 private: |
173 RingBuffer(const RingBuffer &); // not provided | 174 RingBuffer(const RingBuffer &); // not provided |
174 RingBuffer &operator=(const RingBuffer &); // not provided | 175 RingBuffer &operator=(const RingBuffer &); // not provided |
175 }; | 176 }; |
176 | |
177 template <typename T, int N> | |
178 Scavenger<ScavengerArrayWrapper<T> > RingBuffer<T, N>::m_scavenger; | |
179 | 177 |
180 template <typename T, int N> | 178 template <typename T, int N> |
181 RingBuffer<T, N>::RingBuffer(size_t n) : | 179 RingBuffer<T, N>::RingBuffer(size_t n) : |
182 m_buffer(new T[n + 1]), | 180 m_buffer(new T[n + 1]), |
183 m_mlocked(false), | 181 m_mlocked(false), |
198 std::cerr << "&m_readers = " << &m_readers << std::endl; | 196 std::cerr << "&m_readers = " << &m_readers << std::endl; |
199 std::cerr << "&m_size = " << &m_size << std::endl; | 197 std::cerr << "&m_size = " << &m_size << std::endl; |
200 */ | 198 */ |
201 | 199 |
202 for (int i = 0; i < N; ++i) m_readers[i] = 0; | 200 for (int i = 0; i < N; ++i) m_readers[i] = 0; |
203 | |
204 m_scavenger.scavenge(); | |
205 } | 201 } |
206 | 202 |
207 template <typename T, int N> | 203 template <typename T, int N> |
208 RingBuffer<T, N>::~RingBuffer() | 204 RingBuffer<T, N>::~RingBuffer() |
209 { | 205 { |
215 | 211 |
216 if (m_mlocked) { | 212 if (m_mlocked) { |
217 MUNLOCK((void *)m_buffer, m_size * sizeof(T)); | 213 MUNLOCK((void *)m_buffer, m_size * sizeof(T)); |
218 } | 214 } |
219 delete[] m_buffer; | 215 delete[] m_buffer; |
220 | |
221 m_scavenger.scavenge(); | |
222 } | 216 } |
223 | 217 |
224 template <typename T, int N> | 218 template <typename T, int N> |
225 size_t | 219 size_t |
226 RingBuffer<T, N>::getSize() const | 220 RingBuffer<T, N>::getSize() const |
231 | 225 |
232 return m_size - 1; | 226 return m_size - 1; |
233 } | 227 } |
234 | 228 |
235 template <typename T, int N> | 229 template <typename T, int N> |
236 void | 230 RingBuffer<T, N> * |
237 RingBuffer<T, N>::resize(size_t newSize) | 231 RingBuffer<T, N>::resized(size_t newSize) const |
238 { | 232 { |
239 #ifdef DEBUG_RINGBUFFER | 233 #ifdef DEBUG_RINGBUFFER |
240 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::resize(" << newSize << ")" << std::endl; | 234 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::resized(" << newSize << ")" << std::endl; |
241 #endif | 235 #endif |
242 | 236 |
243 m_scavenger.scavenge(); | 237 RingBuffer<T, N> *newBuffer = new RingBuffer<T, N>(newSize); |
244 | 238 |
245 if (m_mlocked) { | 239 int w = m_writer; |
246 MUNLOCK((void *)m_buffer, m_size * sizeof(T)); | 240 int r = m_readers[0]; |
247 } | 241 |
248 | 242 while (r != w) { |
249 m_scavenger.claim(new ScavengerArrayWrapper<T>(m_buffer)); | 243 T value = m_buffer[r]; |
250 | 244 newBuffer->write(&value, 1); |
251 reset(); | 245 if (++r == m_size) r = 0; |
252 m_buffer = new T[newSize + 1]; | 246 } |
253 m_size = newSize + 1; | 247 |
254 | 248 return newBuffer; |
255 if (m_mlocked) { | |
256 if (MLOCK((void *)m_buffer, m_size * sizeof(T))) { | |
257 m_mlocked = false; | |
258 } | |
259 } | |
260 } | 249 } |
261 | 250 |
262 template <typename T, int N> | 251 template <typename T, int N> |
263 bool | 252 bool |
264 RingBuffer<T, N>::mlock() | 253 RingBuffer<T, N>::mlock() |
348 } else { | 337 } else { |
349 memcpy(destination, m_buffer + m_readers[R], here * sizeof(T)); | 338 memcpy(destination, m_buffer + m_readers[R], here * sizeof(T)); |
350 memcpy(destination + here, m_buffer, (n - here) * sizeof(T)); | 339 memcpy(destination + here, m_buffer, (n - here) * sizeof(T)); |
351 } | 340 } |
352 | 341 |
342 MBARRIER(); | |
353 m_readers[R] = (m_readers[R] + n) % m_size; | 343 m_readers[R] = (m_readers[R] + n) % m_size; |
354 | 344 |
355 #ifdef DEBUG_RINGBUFFER | 345 #ifdef DEBUG_RINGBUFFER |
356 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::read: read " << n << ", reader now " << m_readers[R] << std::endl; | 346 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::read: read " << n << ", reader now " << m_readers[R] << std::endl; |
357 #endif | 347 #endif |
390 for (size_t i = 0; i < (n - here); ++i) { | 380 for (size_t i = 0; i < (n - here); ++i) { |
391 destination[i + here] += m_buffer[i]; | 381 destination[i + here] += m_buffer[i]; |
392 } | 382 } |
393 } | 383 } |
394 | 384 |
385 MBARRIER(); | |
395 m_readers[R] = (m_readers[R] + n) % m_size; | 386 m_readers[R] = (m_readers[R] + n) % m_size; |
396 return n; | 387 return n; |
397 } | 388 } |
398 | 389 |
399 template <typename T, int N> | 390 template <typename T, int N> |
412 T t; | 403 T t; |
413 memset(&t, 0, sizeof(T)); | 404 memset(&t, 0, sizeof(T)); |
414 return t; | 405 return t; |
415 } | 406 } |
416 T value = m_buffer[m_readers[R]]; | 407 T value = m_buffer[m_readers[R]]; |
408 MBARRIER(); | |
417 if (++m_readers[R] == m_size) m_readers[R] = 0; | 409 if (++m_readers[R] == m_size) m_readers[R] = 0; |
418 return value; | 410 return value; |
419 } | 411 } |
420 | 412 |
421 template <typename T, int N> | 413 template <typename T, int N> |
518 } else { | 510 } else { |
519 memcpy(m_buffer + m_writer, source, here * sizeof(T)); | 511 memcpy(m_buffer + m_writer, source, here * sizeof(T)); |
520 memcpy(m_buffer, source + here, (n - here) * sizeof(T)); | 512 memcpy(m_buffer, source + here, (n - here) * sizeof(T)); |
521 } | 513 } |
522 | 514 |
515 MBARRIER(); | |
523 m_writer = (m_writer + n) % m_size; | 516 m_writer = (m_writer + n) % m_size; |
524 | 517 |
525 #ifdef DEBUG_RINGBUFFER | 518 #ifdef DEBUG_RINGBUFFER |
526 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::write: wrote " << n << ", writer now " << m_writer << std::endl; | 519 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::write: wrote " << n << ", writer now " << m_writer << std::endl; |
527 #endif | 520 #endif |
552 memset(m_buffer + m_writer, 0, n * sizeof(T)); | 545 memset(m_buffer + m_writer, 0, n * sizeof(T)); |
553 } else { | 546 } else { |
554 memset(m_buffer + m_writer, 0, here * sizeof(T)); | 547 memset(m_buffer + m_writer, 0, here * sizeof(T)); |
555 memset(m_buffer, 0, (n - here) * sizeof(T)); | 548 memset(m_buffer, 0, (n - here) * sizeof(T)); |
556 } | 549 } |
557 | 550 |
551 MBARRIER(); | |
558 m_writer = (m_writer + n) % m_size; | 552 m_writer = (m_writer + n) % m_size; |
559 return n; | 553 return n; |
560 } | 554 } |
561 | 555 |
562 #endif // _RINGBUFFER_H_ | 556 #endif // _RINGBUFFER_H_ |