Mercurial > hg > svcore
diff data/fft/FFTDataServer.cpp @ 537:3cc4b7cd2aa5
* Merge from one-fftdataserver-per-fftmodel branch. This bit of
reworking (which is not described very accurately by the title of
the branch) turns the MatrixFile object into something that either
reads or writes, but not both, and separates the FFT file cache
reader and writer implementations separately. This allows the
FFT data server to have a single thread owning writers and one reader
per "customer" thread, and for all locking to be vastly simplified
and concentrated in the data server alone (because none of the
classes it makes use of is used in more than one thread at a time).
The result is faster and more trustworthy code.
author | Chris Cannam |
---|---|
date | Tue, 27 Jan 2009 13:25:10 +0000 |
parents | beb51f558e9c |
children | 87aef350f1dc |
line wrap: on
line diff
--- a/data/fft/FFTDataServer.cpp Mon Jan 26 15:18:32 2009 +0000 +++ b/data/fft/FFTDataServer.cpp Tue Jan 27 13:25:10 2009 +0000 @@ -15,7 +15,8 @@ #include "FFTDataServer.h" -#include "FFTFileCache.h" +#include "FFTFileCacheReader.h" +#include "FFTFileCacheWriter.h" #include "FFTMemoryCache.h" #include "model/DenseTimeValueModel.h" @@ -27,7 +28,9 @@ #include "base/Profiler.h" #include "base/Thread.h" // for debug mutex locker -//#define DEBUG_FFT_SERVER 1 +#include <QWriteLocker> + +#define DEBUG_FFT_SERVER 1 //#define DEBUG_FFT_SERVER_FILL 1 #ifdef DEBUG_FFT_SERVER_FILL @@ -326,6 +329,7 @@ std::cerr << "ERROR: FFTDataServer::releaseInstance(" << server << "): instance not allocated" << std::endl; } else if (--i->second.second == 0) { +/*!!! if (server->m_lastUsedCache == -1) { // never used #ifdef DEBUG_FFT_SERVER std::cerr << "FFTDataServer::releaseInstance: instance " @@ -335,6 +339,7 @@ delete server; m_servers.erase(i); } else { +*/ #ifdef DEBUG_FFT_SERVER std::cerr << "FFTDataServer::releaseInstance: instance " << server << " no longer in use, marking for possible collection" @@ -353,7 +358,7 @@ if (!found) m_releasedServers.push_back(server); server->suspend(); purgeLimbo(); - } +//!!! } } else { #ifdef DEBUG_FFT_SERVER std::cerr << "FFTDataServer::releaseInstance: instance " @@ -497,7 +502,6 @@ m_cacheWidth(0), m_cacheWidthPower(0), m_cacheWidthMask(0), - m_lastUsedCache(-1), m_criteria(criteria), m_fftInput(0), m_exiting(false), @@ -637,9 +641,6 @@ MutexLocker locker(&m_writeMutex, "FFTDataServer::suspend::m_writeMutex"); m_suspended = true; - for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) { - if (*i) (*i)->suspend(); - } } void @@ -721,7 +722,7 @@ if ((recommendation & StorageAdviser::UseMemory) || (recommendation & StorageAdviser::PreferMemory)) { - memoryCache = true; +//!!! memoryCache = true; } compactCache = canCompact && @@ -734,63 +735,21 @@ #endif } -FFTCache * -FFTDataServer::getCacheAux(size_t c) +bool +FFTDataServer::makeCache(int c) { - Profiler profiler("FFTDataServer::getCacheAux", false); -#ifdef DEBUG_FFT_SERVER - std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "])::getCacheAux" << std::endl; -#endif + QWriteLocker locker(&m_cacheVectorLock); - MutexLocker locker(&m_writeMutex, - "FFTDataServer::getCacheAux::m_writeMutex"); - - if (m_lastUsedCache == -1) { - m_fillThread->start(); + if (m_caches[c]) { + // someone else must have created the cache between our + // testing for it and taking the write lock + return true; } - if (int(c) != m_lastUsedCache) { - -#ifdef DEBUG_FFT_SERVER - std::cerr << "switch from " << m_lastUsedCache << " to " << c << std::endl; -#endif - - for (IntQueue::iterator i = m_dormantCaches.begin(); - i != m_dormantCaches.end(); ++i) { - if (*i == int(c)) { - m_dormantCaches.erase(i); - break; - } - } - - if (m_lastUsedCache >= 0) { - bool inDormant = false; - for (size_t i = 0; i < m_dormantCaches.size(); ++i) { - if (m_dormantCaches[i] == m_lastUsedCache) { - inDormant = true; - break; - } - } - if (!inDormant) { - m_dormantCaches.push_back(m_lastUsedCache); - } - while (m_dormantCaches.size() > 4) { - int dc = m_dormantCaches.front(); - m_dormantCaches.pop_front(); - m_caches[dc]->suspend(); - } - } - } - - if (m_caches[c]) { - m_lastUsedCache = c; - return m_caches[c]; - } + CacheBlock *cb = new CacheBlock; QString name = QString("%1-%2").arg(m_fileBaseName).arg(c); - FFTCache *cache = 0; - size_t width = m_cacheWidth; if (c * m_cacheWidth + width > m_width) { width = m_width - c * m_cacheWidth; @@ -801,73 +760,111 @@ getStorageAdvice(width, m_height, memoryCache, compactCache); - try { + bool success = false; - if (memoryCache) { + if (memoryCache) { - cache = new FFTMemoryCache - (compactCache ? FFTMemoryCache::Compact : - m_polar ? FFTMemoryCache::Polar : - FFTMemoryCache::Rectangular); + try { - } else { + cb->memoryCache = new FFTMemoryCache + (compactCache ? FFTCache::Compact : + m_polar ? FFTCache::Polar : + FFTCache::Rectangular, + width, m_height); - cache = new FFTFileCache - (name, - MatrixFile::ReadWrite, - compactCache ? FFTFileCache::Compact : - m_polar ? FFTFileCache::Polar : - FFTFileCache::Rectangular); - } + success = true; - cache->resize(width, m_height); - cache->reset(); + } catch (std::bad_alloc) { - } catch (std::bad_alloc) { - - delete cache; - cache = 0; - - if (memoryCache) { + delete cb->memoryCache; + cb->memoryCache = 0; - std::cerr << "WARNING: Memory allocation failed when resizing" - << " FFT memory cache no. " << c << " to " << width + std::cerr << "WARNING: Memory allocation failed when creating" + << " FFT memory cache no. " << c << " of " << width << "x" << m_height << " (of total width " << m_width << "): falling back to disc cache" << std::endl; - try { - - purgeLimbo(0); - - cache = new FFTFileCache(name, - MatrixFile::ReadWrite, - FFTFileCache::Compact); - - cache->resize(width, m_height); - cache->reset(); - - } catch (std::bad_alloc) { - - delete cache; - cache = 0; - } - } - - if (!cache) { - std::cerr << "ERROR: Memory allocation failed when resizing" - << " FFT file cache no. " << c << " to " << width - << "x" << m_height << " (of total width " << m_width - << "): abandoning this cache" << std::endl; - - throw AllocationFailed("Failed to create or resize an FFT model slice"); + memoryCache = false; } } - m_caches[c] = cache; - m_lastUsedCache = c; - return cache; + if (!memoryCache) { + + try { + + cb->fileCacheWriter = new FFTFileCacheWriter + (name, + compactCache ? FFTCache::Compact : + m_polar ? FFTCache::Polar : + FFTCache::Rectangular, + width, m_height); + + success = true; + + } catch (std::exception e) { + + delete cb->fileCacheWriter; + cb->fileCacheWriter = 0; + + std::cerr << "ERROR: Failed to construct disc cache for FFT data: " + << e.what() << std::endl; + } + } + + m_caches[c] = cb; + + return success; } + +bool +FFTDataServer::makeCacheReader(int c) +{ + // preconditions: m_caches[c] exists and contains a file writer; + // m_cacheVectorLock is not locked by this thread +#ifdef DEBUG_FFT_SERVER + std::cerr << "FFTDataServer::makeCacheReader(" << c << ")" << std::endl; +#endif + QThread *me = QThread::currentThread(); + QWriteLocker locker(&m_cacheVectorLock); + CacheBlock *cb(m_caches.at(c)); + if (!cb || !cb->fileCacheWriter) return false; + + try { + + cb->fileCacheReader[me] = new FFTFileCacheReader(cb->fileCacheWriter); + + } catch (std::exception e) { + + delete cb->fileCacheReader[me]; + cb->fileCacheReader.erase(me); + + std::cerr << "ERROR: Failed to construct disc cache reader for FFT data: " + << e.what() << std::endl; + return false; + } + + // erase a reader that looks like it may no longer going to be + // used by this thread for a while (leaving alone the current + // and previous cache readers) + int deleteCandidate = c - 2; + if (deleteCandidate < 0) deleteCandidate = c + 2; + if (deleteCandidate >= m_caches.size()) { + return true; + } + + cb = m_caches.at(deleteCandidate); + if (cb && cb->fileCacheReader.find(me) != cb->fileCacheReader.end()) { +#ifdef DEBUG_FFT_SERVER + std::cerr << "FFTDataServer::makeCacheReader: Deleting probably unpopular reader " << deleteCandidate << " for this thread (as I create reader " << c << ")" << std::endl; +#endif + delete cb->fileCacheReader[me]; + cb->fileCacheReader.erase(me); + } + + return true; +} + float FFTDataServer::getMagnitudeAt(size_t x, size_t y) { @@ -876,9 +873,10 @@ if (x >= m_width || y >= m_height) return 0; size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return 0; + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getMagnitudeAt: filling"); #ifdef DEBUG_FFT_SERVER @@ -908,9 +906,10 @@ } size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return false; + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getMagnitudesAt: filling"); MutexLocker locker(&m_writeMutex, @@ -931,9 +930,10 @@ if (x >= m_width || y >= m_height) return 0; size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return 0; + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling"); // hold mutex so that write thread doesn't mess with class @@ -959,9 +959,10 @@ } size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return false; + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling"); MutexLocker locker(&m_writeMutex, @@ -984,9 +985,10 @@ if (x >= m_width) return 0; size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return 0; + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling"); // hold mutex so that write thread doesn't mess with class @@ -1006,9 +1008,10 @@ if (x >= m_width || y >= m_height) return 0; size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return 0; + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getPhaseAt: filling"); // hold mutex so that write thread doesn't mess with class @@ -1034,9 +1037,10 @@ } size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return false; + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getPhasesAt: filling"); MutexLocker locker(&m_writeMutex, @@ -1063,7 +1067,7 @@ } size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) { real = 0; @@ -1071,6 +1075,7 @@ return; } + //!!! n.b. can throw if (!cache->haveSetColumnAt(col)) { Profiler profiler("FFTDataServer::getValuesAt: filling"); #ifdef DEBUG_FFT_SERVER @@ -1094,6 +1099,7 @@ if (x >= m_width) return true; if (!haveCache(x)) { +/*!!! if (m_lastUsedCache == -1) { if (m_suspended) { std::cerr << "FFTDataServer::isColumnReady(" << x << "): no cache, calling resume" << std::endl; @@ -1101,13 +1107,15 @@ } m_fillThread->start(); } +*/ return false; } size_t col; - FFTCache *cache = getCache(x, col); + FFTCacheReader *cache = getCacheReader(x, col); if (!cache) return true; + //!!! n.b. can throw return cache->haveSetColumnAt(col); } @@ -1133,7 +1141,6 @@ std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " << "x > width (" << x << " > " << m_width << ")" << std::endl; -// abort(); //!!! return; } @@ -1141,16 +1148,9 @@ #ifdef DEBUG_FFT_SERVER_FILL std::cout << "FFTDataServer::fillColumn(" << x << ")" << std::endl; #endif - FFTCache *cache = getCache(x, col); + FFTCacheWriter *cache = getCacheWriter(x, col); if (!cache) return; - { - MutexLocker locker(lockHeld ? 0 : &m_writeMutex, - "FFTDataServer::fillColumn::m_writeMutex [1]"); - - if (cache->haveSetColumnAt(col)) return; - } - int winsize = m_windowSize; int fftsize = m_fftSize; int hs = fftsize/2; @@ -1271,6 +1271,19 @@ } } +void +FFTDataServer::fillComplete() +{ + for (int i = 0; i < int(m_caches.size()); ++i) { + if (m_caches[i]->memoryCache) { + m_caches[i]->memoryCache->allColumnsWritten(); + } + if (m_caches[i]->fileCacheWriter) { + m_caches[i]->fileCacheWriter->allColumnsWritten(); + } + } +} + size_t FFTDataServer::getFillCompletion() const { @@ -1418,7 +1431,12 @@ } } + m_server.fillComplete(); m_completion = 100; m_extent = end; + +#ifdef DEBUG_FFT_SERVER + std::cerr << "FFTDataServer::FillThread::run exiting" << std::endl; +#endif }