comparison base/RingBuffer.h @ 835:1d439494604c

Memory barriers in ringbuffer
author Chris Cannam
date Mon, 16 Sep 2013 15:47:27 +0100
parents 2ba202c5be8d
children 6a94bb528e9d
comparison
equal deleted inserted replaced
834:fcee7e040ab4 835:1d439494604c
22 #define _RINGBUFFER_H_ 22 #define _RINGBUFFER_H_
23 23
24 #include <sys/types.h> 24 #include <sys/types.h>
25 25
26 #include "system/System.h" 26 #include "system/System.h"
27 #include "Scavenger.h"
28 27
29 #include <cstring> // memcpy, memset &c 28 #include <cstring> // memcpy, memset &c
30 29
31 //#define DEBUG_RINGBUFFER 1 30 //#define DEBUG_RINGBUFFER 1
32 31
65 * (This is the argument n passed to the constructor.) 64 * (This is the argument n passed to the constructor.)
66 */ 65 */
67 size_t getSize() const; 66 size_t getSize() const;
68 67
69 /** 68 /**
70 * Resize the ring buffer. This also empties it. Actually swaps 69 * Return a new ring buffer (allocated with "new" -- caller must
71 * in a new, larger buffer; the old buffer is scavenged after a 70 * delete when no longer needed) of the given size, containing the
72 * seemly delay. Should be called from the write thread. 71 * same data as this one as perceived by reader 0 of this buffer.
73 */ 72 * If another thread reads from or writes to this buffer during
74 void resize(size_t newSize); 73 * the call, the contents of the new buffer may be incomplete or
74 * inconsistent. If this buffer's data will not fit in the new
75 * size, the contents are undefined.
76 */
77 RingBuffer<T, N> *resized(size_t newSize) const;
75 78
76 /** 79 /**
77 * Lock the ring buffer into physical memory. Returns true 80 * Lock the ring buffer into physical memory. Returns true
78 * for success. 81 * for success.
79 */ 82 */
165 size_t m_writer; 168 size_t m_writer;
166 size_t *m_readers; 169 size_t *m_readers;
167 size_t m_size; 170 size_t m_size;
168 size_t m_spare; 171 size_t m_spare;
169 172
170 static Scavenger<ScavengerArrayWrapper<T> > m_scavenger;
171
172 private: 173 private:
173 RingBuffer(const RingBuffer &); // not provided 174 RingBuffer(const RingBuffer &); // not provided
174 RingBuffer &operator=(const RingBuffer &); // not provided 175 RingBuffer &operator=(const RingBuffer &); // not provided
175 }; 176 };
176
177 template <typename T, int N>
178 Scavenger<ScavengerArrayWrapper<T> > RingBuffer<T, N>::m_scavenger;
179 177
180 template <typename T, int N> 178 template <typename T, int N>
181 RingBuffer<T, N>::RingBuffer(size_t n) : 179 RingBuffer<T, N>::RingBuffer(size_t n) :
182 m_buffer(new T[n + 1]), 180 m_buffer(new T[n + 1]),
183 m_mlocked(false), 181 m_mlocked(false),
198 std::cerr << "&m_readers = " << &m_readers << std::endl; 196 std::cerr << "&m_readers = " << &m_readers << std::endl;
199 std::cerr << "&m_size = " << &m_size << std::endl; 197 std::cerr << "&m_size = " << &m_size << std::endl;
200 */ 198 */
201 199
202 for (int i = 0; i < N; ++i) m_readers[i] = 0; 200 for (int i = 0; i < N; ++i) m_readers[i] = 0;
203
204 m_scavenger.scavenge();
205 } 201 }
206 202
207 template <typename T, int N> 203 template <typename T, int N>
208 RingBuffer<T, N>::~RingBuffer() 204 RingBuffer<T, N>::~RingBuffer()
209 { 205 {
215 211
216 if (m_mlocked) { 212 if (m_mlocked) {
217 MUNLOCK((void *)m_buffer, m_size * sizeof(T)); 213 MUNLOCK((void *)m_buffer, m_size * sizeof(T));
218 } 214 }
219 delete[] m_buffer; 215 delete[] m_buffer;
220
221 m_scavenger.scavenge();
222 } 216 }
223 217
224 template <typename T, int N> 218 template <typename T, int N>
225 size_t 219 size_t
226 RingBuffer<T, N>::getSize() const 220 RingBuffer<T, N>::getSize() const
231 225
232 return m_size - 1; 226 return m_size - 1;
233 } 227 }
234 228
235 template <typename T, int N> 229 template <typename T, int N>
236 void 230 RingBuffer<T, N> *
237 RingBuffer<T, N>::resize(size_t newSize) 231 RingBuffer<T, N>::resized(size_t newSize) const
238 { 232 {
239 #ifdef DEBUG_RINGBUFFER 233 #ifdef DEBUG_RINGBUFFER
240 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::resize(" << newSize << ")" << std::endl; 234 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::resized(" << newSize << ")" << std::endl;
241 #endif 235 #endif
242 236
243 m_scavenger.scavenge(); 237 RingBuffer<T, N> *newBuffer = new RingBuffer<T, N>(newSize);
244 238
245 if (m_mlocked) { 239 int w = m_writer;
246 MUNLOCK((void *)m_buffer, m_size * sizeof(T)); 240 int r = m_readers[0];
247 } 241
248 242 while (r != w) {
249 m_scavenger.claim(new ScavengerArrayWrapper<T>(m_buffer)); 243 T value = m_buffer[r];
250 244 newBuffer->write(&value, 1);
251 reset(); 245 if (++r == m_size) r = 0;
252 m_buffer = new T[newSize + 1]; 246 }
253 m_size = newSize + 1; 247
254 248 return newBuffer;
255 if (m_mlocked) {
256 if (MLOCK((void *)m_buffer, m_size * sizeof(T))) {
257 m_mlocked = false;
258 }
259 }
260 } 249 }
261 250
262 template <typename T, int N> 251 template <typename T, int N>
263 bool 252 bool
264 RingBuffer<T, N>::mlock() 253 RingBuffer<T, N>::mlock()
348 } else { 337 } else {
349 memcpy(destination, m_buffer + m_readers[R], here * sizeof(T)); 338 memcpy(destination, m_buffer + m_readers[R], here * sizeof(T));
350 memcpy(destination + here, m_buffer, (n - here) * sizeof(T)); 339 memcpy(destination + here, m_buffer, (n - here) * sizeof(T));
351 } 340 }
352 341
342 MBARRIER();
353 m_readers[R] = (m_readers[R] + n) % m_size; 343 m_readers[R] = (m_readers[R] + n) % m_size;
354 344
355 #ifdef DEBUG_RINGBUFFER 345 #ifdef DEBUG_RINGBUFFER
356 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::read: read " << n << ", reader now " << m_readers[R] << std::endl; 346 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::read: read " << n << ", reader now " << m_readers[R] << std::endl;
357 #endif 347 #endif
390 for (size_t i = 0; i < (n - here); ++i) { 380 for (size_t i = 0; i < (n - here); ++i) {
391 destination[i + here] += m_buffer[i]; 381 destination[i + here] += m_buffer[i];
392 } 382 }
393 } 383 }
394 384
385 MBARRIER();
395 m_readers[R] = (m_readers[R] + n) % m_size; 386 m_readers[R] = (m_readers[R] + n) % m_size;
396 return n; 387 return n;
397 } 388 }
398 389
399 template <typename T, int N> 390 template <typename T, int N>
412 T t; 403 T t;
413 memset(&t, 0, sizeof(T)); 404 memset(&t, 0, sizeof(T));
414 return t; 405 return t;
415 } 406 }
416 T value = m_buffer[m_readers[R]]; 407 T value = m_buffer[m_readers[R]];
408 MBARRIER();
417 if (++m_readers[R] == m_size) m_readers[R] = 0; 409 if (++m_readers[R] == m_size) m_readers[R] = 0;
418 return value; 410 return value;
419 } 411 }
420 412
421 template <typename T, int N> 413 template <typename T, int N>
518 } else { 510 } else {
519 memcpy(m_buffer + m_writer, source, here * sizeof(T)); 511 memcpy(m_buffer + m_writer, source, here * sizeof(T));
520 memcpy(m_buffer, source + here, (n - here) * sizeof(T)); 512 memcpy(m_buffer, source + here, (n - here) * sizeof(T));
521 } 513 }
522 514
515 MBARRIER();
523 m_writer = (m_writer + n) % m_size; 516 m_writer = (m_writer + n) % m_size;
524 517
525 #ifdef DEBUG_RINGBUFFER 518 #ifdef DEBUG_RINGBUFFER
526 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::write: wrote " << n << ", writer now " << m_writer << std::endl; 519 std::cerr << "RingBuffer<T," << N << ">[" << this << "]::write: wrote " << n << ", writer now " << m_writer << std::endl;
527 #endif 520 #endif
552 memset(m_buffer + m_writer, 0, n * sizeof(T)); 545 memset(m_buffer + m_writer, 0, n * sizeof(T));
553 } else { 546 } else {
554 memset(m_buffer + m_writer, 0, here * sizeof(T)); 547 memset(m_buffer + m_writer, 0, here * sizeof(T));
555 memset(m_buffer, 0, (n - here) * sizeof(T)); 548 memset(m_buffer, 0, (n - here) * sizeof(T));
556 } 549 }
557 550
551 MBARRIER();
558 m_writer = (m_writer + n) % m_size; 552 m_writer = (m_writer + n) % m_size;
559 return n; 553 return n;
560 } 554 }
561 555
562 #endif // _RINGBUFFER_H_ 556 #endif // _RINGBUFFER_H_