diff data/fft/FFTDataServer.cpp @ 537:3cc4b7cd2aa5

* Merge from one-fftdataserver-per-fftmodel branch. This bit of reworking (which is not described very accurately by the title of the branch) turns the MatrixFile object into something that either reads or writes, but not both, and separates the FFT file cache reader and writer implementations separately. This allows the FFT data server to have a single thread owning writers and one reader per "customer" thread, and for all locking to be vastly simplified and concentrated in the data server alone (because none of the classes it makes use of is used in more than one thread at a time). The result is faster and more trustworthy code.
author Chris Cannam
date Tue, 27 Jan 2009 13:25:10 +0000
parents beb51f558e9c
children 87aef350f1dc
line wrap: on
line diff
--- a/data/fft/FFTDataServer.cpp	Mon Jan 26 15:18:32 2009 +0000
+++ b/data/fft/FFTDataServer.cpp	Tue Jan 27 13:25:10 2009 +0000
@@ -15,7 +15,8 @@
 
 #include "FFTDataServer.h"
 
-#include "FFTFileCache.h"
+#include "FFTFileCacheReader.h"
+#include "FFTFileCacheWriter.h"
 #include "FFTMemoryCache.h"
 
 #include "model/DenseTimeValueModel.h"
@@ -27,7 +28,9 @@
 #include "base/Profiler.h"
 #include "base/Thread.h" // for debug mutex locker
 
-//#define DEBUG_FFT_SERVER 1
+#include <QWriteLocker>
+
+#define DEBUG_FFT_SERVER 1
 //#define DEBUG_FFT_SERVER_FILL 1
 
 #ifdef DEBUG_FFT_SERVER_FILL
@@ -326,6 +329,7 @@
                 std::cerr << "ERROR: FFTDataServer::releaseInstance("
                           << server << "): instance not allocated" << std::endl;
             } else if (--i->second.second == 0) {
+/*!!!
                 if (server->m_lastUsedCache == -1) { // never used
 #ifdef DEBUG_FFT_SERVER
                     std::cerr << "FFTDataServer::releaseInstance: instance "
@@ -335,6 +339,7 @@
                     delete server;
                     m_servers.erase(i);
                 } else {
+*/
 #ifdef DEBUG_FFT_SERVER
                     std::cerr << "FFTDataServer::releaseInstance: instance "
                               << server << " no longer in use, marking for possible collection"
@@ -353,7 +358,7 @@
                     if (!found) m_releasedServers.push_back(server);
                     server->suspend();
                     purgeLimbo();
-                }
+//!!!                }
             } else {
 #ifdef DEBUG_FFT_SERVER
                     std::cerr << "FFTDataServer::releaseInstance: instance "
@@ -497,7 +502,6 @@
     m_cacheWidth(0),
     m_cacheWidthPower(0),
     m_cacheWidthMask(0),
-    m_lastUsedCache(-1),
     m_criteria(criteria),
     m_fftInput(0),
     m_exiting(false),
@@ -637,9 +641,6 @@
     MutexLocker locker(&m_writeMutex,
                        "FFTDataServer::suspend::m_writeMutex");
     m_suspended = true;
-    for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) {
-        if (*i) (*i)->suspend();
-    }
 }
 
 void
@@ -721,7 +722,7 @@
 
     if ((recommendation & StorageAdviser::UseMemory) ||
         (recommendation & StorageAdviser::PreferMemory)) {
-        memoryCache = true;
+//!!!        memoryCache = true;
     }
 
     compactCache = canCompact &&
@@ -734,63 +735,21 @@
 #endif
 }
 
-FFTCache *
-FFTDataServer::getCacheAux(size_t c)
+bool
+FFTDataServer::makeCache(int c)
 {
-    Profiler profiler("FFTDataServer::getCacheAux", false);
-#ifdef DEBUG_FFT_SERVER
-    std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "])::getCacheAux" << std::endl;
-#endif
+    QWriteLocker locker(&m_cacheVectorLock);
 
-    MutexLocker locker(&m_writeMutex,
-                       "FFTDataServer::getCacheAux::m_writeMutex");
-
-    if (m_lastUsedCache == -1) {
-        m_fillThread->start();
+    if (m_caches[c]) {
+        // someone else must have created the cache between our
+        // testing for it and taking the write lock
+        return true;
     }
 
-    if (int(c) != m_lastUsedCache) {
-
-#ifdef DEBUG_FFT_SERVER
-        std::cerr << "switch from " << m_lastUsedCache << " to " << c << std::endl;
-#endif
-
-        for (IntQueue::iterator i = m_dormantCaches.begin();
-             i != m_dormantCaches.end(); ++i) {
-            if (*i == int(c)) {
-                m_dormantCaches.erase(i);
-                break;
-            }
-        }
-
-        if (m_lastUsedCache >= 0) {
-            bool inDormant = false;
-            for (size_t i = 0; i < m_dormantCaches.size(); ++i) {
-                if (m_dormantCaches[i] == m_lastUsedCache) {
-                    inDormant = true;
-                    break;
-                }
-            }
-            if (!inDormant) {
-                m_dormantCaches.push_back(m_lastUsedCache);
-            }
-            while (m_dormantCaches.size() > 4) {
-                int dc = m_dormantCaches.front();
-                m_dormantCaches.pop_front();
-                m_caches[dc]->suspend();
-            }
-        }
-    }
-
-    if (m_caches[c]) {
-        m_lastUsedCache = c;
-        return m_caches[c];
-    }
+    CacheBlock *cb = new CacheBlock;
 
     QString name = QString("%1-%2").arg(m_fileBaseName).arg(c);
 
-    FFTCache *cache = 0;
-
     size_t width = m_cacheWidth;
     if (c * m_cacheWidth + width > m_width) {
         width = m_width - c * m_cacheWidth;
@@ -801,73 +760,111 @@
 
     getStorageAdvice(width, m_height, memoryCache, compactCache);
 
-    try {
+    bool success = false;
 
-        if (memoryCache) {
+    if (memoryCache) {
 
-            cache = new FFTMemoryCache
-                (compactCache ? FFTMemoryCache::Compact :
-                      m_polar ? FFTMemoryCache::Polar :
-                                FFTMemoryCache::Rectangular);
+        try {
 
-        } else {
+            cb->memoryCache = new FFTMemoryCache
+                (compactCache ? FFTCache::Compact :
+                      m_polar ? FFTCache::Polar :
+                                FFTCache::Rectangular,
+                 width, m_height);
 
-            cache = new FFTFileCache
-                (name,
-                 MatrixFile::ReadWrite,
-                 compactCache ? FFTFileCache::Compact :
-                      m_polar ? FFTFileCache::Polar :
-                                FFTFileCache::Rectangular);
-        }
+            success = true;
 
-        cache->resize(width, m_height);
-        cache->reset();
+        } catch (std::bad_alloc) {
 
-    } catch (std::bad_alloc) {
-
-        delete cache;
-        cache = 0;
-
-        if (memoryCache) {
+            delete cb->memoryCache;
+            cb->memoryCache = 0;
             
-            std::cerr << "WARNING: Memory allocation failed when resizing"
-                      << " FFT memory cache no. " << c << " to " << width 
+            std::cerr << "WARNING: Memory allocation failed when creating"
+                      << " FFT memory cache no. " << c << " of " << width 
                       << "x" << m_height << " (of total width " << m_width
                       << "): falling back to disc cache" << std::endl;
 
-            try {
-
-                purgeLimbo(0);
-
-                cache = new FFTFileCache(name,
-                                         MatrixFile::ReadWrite,
-                                         FFTFileCache::Compact);
-
-                cache->resize(width, m_height);
-                cache->reset();
-
-            } catch (std::bad_alloc) {
-
-                delete cache;
-                cache = 0;
-            }
-        }
-
-        if (!cache) {
-            std::cerr << "ERROR: Memory allocation failed when resizing"
-                      << " FFT file cache no. " << c << " to " << width
-                      << "x" << m_height << " (of total width " << m_width
-                      << "): abandoning this cache" << std::endl;
-
-            throw AllocationFailed("Failed to create or resize an FFT model slice");
+            memoryCache = false;
         }
     }
 
-    m_caches[c] = cache;
-    m_lastUsedCache = c;
-    return cache;
+    if (!memoryCache) {
+
+        try {
+        
+            cb->fileCacheWriter = new FFTFileCacheWriter
+                (name,
+                 compactCache ? FFTCache::Compact :
+                      m_polar ? FFTCache::Polar :
+                                FFTCache::Rectangular,
+                 width, m_height);
+
+            success = true;
+
+        } catch (std::exception e) {
+
+            delete cb->fileCacheWriter;
+            cb->fileCacheWriter = 0;
+            
+            std::cerr << "ERROR: Failed to construct disc cache for FFT data: "
+                      << e.what() << std::endl;
+        }
+    }
+
+    m_caches[c] = cb;
+
+    return success;
 }
+ 
+bool
+FFTDataServer::makeCacheReader(int c)
+{
+    // preconditions: m_caches[c] exists and contains a file writer;
+    // m_cacheVectorLock is not locked by this thread
+#ifdef DEBUG_FFT_SERVER
+    std::cerr << "FFTDataServer::makeCacheReader(" << c << ")" << std::endl;
+#endif
 
+    QThread *me = QThread::currentThread();
+    QWriteLocker locker(&m_cacheVectorLock);
+    CacheBlock *cb(m_caches.at(c));
+    if (!cb || !cb->fileCacheWriter) return false;
+
+    try {
+        
+        cb->fileCacheReader[me] = new FFTFileCacheReader(cb->fileCacheWriter);
+
+    } catch (std::exception e) {
+
+        delete cb->fileCacheReader[me];
+        cb->fileCacheReader.erase(me);
+            
+        std::cerr << "ERROR: Failed to construct disc cache reader for FFT data: "
+                  << e.what() << std::endl;
+        return false;
+    }
+
+    // erase a reader that looks like it may no longer going to be
+    // used by this thread for a while (leaving alone the current
+    // and previous cache readers)
+    int deleteCandidate = c - 2;
+    if (deleteCandidate < 0) deleteCandidate = c + 2;
+    if (deleteCandidate >= m_caches.size()) {
+        return true;
+    }
+
+    cb = m_caches.at(deleteCandidate);
+    if (cb && cb->fileCacheReader.find(me) != cb->fileCacheReader.end()) {
+#ifdef DEBUG_FFT_SERVER
+        std::cerr << "FFTDataServer::makeCacheReader: Deleting probably unpopular reader " << deleteCandidate << " for this thread (as I create reader " << c << ")" << std::endl;
+#endif
+        delete cb->fileCacheReader[me];
+        cb->fileCacheReader.erase(me);
+    }
+            
+    return true;
+}
+       
 float
 FFTDataServer::getMagnitudeAt(size_t x, size_t y)
 {
@@ -876,9 +873,10 @@
     if (x >= m_width || y >= m_height) return 0;
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return 0;
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getMagnitudeAt: filling");
 #ifdef DEBUG_FFT_SERVER
@@ -908,9 +906,10 @@
     }
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return false;
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getMagnitudesAt: filling");
         MutexLocker locker(&m_writeMutex,
@@ -931,9 +930,10 @@
     if (x >= m_width || y >= m_height) return 0;
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return 0;
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling");
         // hold mutex so that write thread doesn't mess with class
@@ -959,9 +959,10 @@
     }
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return false;
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling");
         MutexLocker locker(&m_writeMutex,
@@ -984,9 +985,10 @@
     if (x >= m_width) return 0;
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return 0;
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling");
         // hold mutex so that write thread doesn't mess with class
@@ -1006,9 +1008,10 @@
     if (x >= m_width || y >= m_height) return 0;
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return 0;
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getPhaseAt: filling");
         // hold mutex so that write thread doesn't mess with class
@@ -1034,9 +1037,10 @@
     }
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return false;
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getPhasesAt: filling");
         MutexLocker locker(&m_writeMutex,
@@ -1063,7 +1067,7 @@
     }
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
 
     if (!cache) {
         real = 0;
@@ -1071,6 +1075,7 @@
         return;
     }
 
+    //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getValuesAt: filling");
 #ifdef DEBUG_FFT_SERVER
@@ -1094,6 +1099,7 @@
     if (x >= m_width) return true;
 
     if (!haveCache(x)) {
+/*!!!
         if (m_lastUsedCache == -1) {
             if (m_suspended) {
                 std::cerr << "FFTDataServer::isColumnReady(" << x << "): no cache, calling resume" << std::endl;
@@ -1101,13 +1107,15 @@
             }
             m_fillThread->start();
         }
+*/
         return false;
     }
 
     size_t col;
-    FFTCache *cache = getCache(x, col);
+    FFTCacheReader *cache = getCacheReader(x, col);
     if (!cache) return true;
 
+    //!!! n.b. can throw
     return cache->haveSetColumnAt(col);
 }    
 
@@ -1133,7 +1141,6 @@
         std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
                   << "x > width (" << x << " > " << m_width << ")"
                   << std::endl;
-//        abort(); //!!!
         return;
     }
 
@@ -1141,16 +1148,9 @@
 #ifdef DEBUG_FFT_SERVER_FILL
     std::cout << "FFTDataServer::fillColumn(" << x << ")" << std::endl;
 #endif
-    FFTCache *cache = getCache(x, col);
+    FFTCacheWriter *cache = getCacheWriter(x, col);
     if (!cache) return;
 
-    {
-        MutexLocker locker(lockHeld ? 0 : &m_writeMutex,
-                           "FFTDataServer::fillColumn::m_writeMutex [1]");
-
-        if (cache->haveSetColumnAt(col)) return;
-    }
-
     int winsize = m_windowSize;
     int fftsize = m_fftSize;
     int hs = fftsize/2;
@@ -1271,6 +1271,19 @@
     }
 }    
 
+void
+FFTDataServer::fillComplete()
+{
+    for (int i = 0; i < int(m_caches.size()); ++i) {
+        if (m_caches[i]->memoryCache) {
+            m_caches[i]->memoryCache->allColumnsWritten();
+        }
+        if (m_caches[i]->fileCacheWriter) {
+            m_caches[i]->fileCacheWriter->allColumnsWritten();
+        }
+    }
+}
+
 size_t
 FFTDataServer::getFillCompletion() const 
 {
@@ -1418,7 +1431,12 @@
         }
     }
 
+    m_server.fillComplete();
     m_completion = 100;
     m_extent = end;
+
+#ifdef DEBUG_FFT_SERVER
+    std::cerr << "FFTDataServer::FillThread::run exiting" << std::endl;
+#endif
 }