diff data/fft/FFTDataServer.cpp @ 549:388afa99d537

* More careful (I hope!) locking
author Chris Cannam
date Thu, 05 Feb 2009 12:53:19 +0000
parents 1469caaa8e67
children 107d3f3705c9
line wrap: on
line diff
--- a/data/fft/FFTDataServer.cpp	Thu Feb 05 12:05:28 2009 +0000
+++ b/data/fft/FFTDataServer.cpp	Thu Feb 05 12:53:19 2009 +0000
@@ -602,11 +602,13 @@
         delete m_fillThread;
     }
 
-    MutexLocker locker(&m_writeMutex,
-                       "FFTDataServer::~FFTDataServer::m_writeMutex");
+//    MutexLocker locker(&m_writeMutex,
+//                       "FFTDataServer::~FFTDataServer::m_writeMutex");
+
+    QMutexLocker mlocker(&m_fftBuffersLock);
+    QWriteLocker wlocker(&m_cacheVectorLock);
 
     for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) {
-
         if (*i) {
             delete *i;
         }
@@ -638,8 +640,7 @@
 #endif
     Profiler profiler("FFTDataServer::suspend", false);
 
-    MutexLocker locker(&m_writeMutex,
-                       "FFTDataServer::suspend::m_writeMutex");
+    QMutexLocker locker(&m_fftBuffersLock);
     m_suspended = true;
 }
 
@@ -899,11 +900,7 @@
         std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn(" 
                   << x << ")" << std::endl;
 #endif
-        // hold mutex so that write thread doesn't mess with class
-        // member data in fillColumn
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getMagnitudeAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }
     return cache->getMagnitudeAt(col, y);
 }
@@ -928,9 +925,7 @@
     //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getMagnitudesAt: filling");
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getMagnitudesAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }
 
     cache->getMagnitudesAt(col, values, minbin, count, step);
@@ -952,11 +947,7 @@
     //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling");
-        // hold mutex so that write thread doesn't mess with class
-        // member data in fillColumn
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getNormalizedMagnitudeAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }
     return cache->getNormalizedMagnitudeAt(col, y);
 }
@@ -981,9 +972,7 @@
     //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling");
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getNormalizedMagnitudesAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }
 
     for (size_t i = 0; i < count; ++i) {
@@ -1007,11 +996,7 @@
     //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling");
-        // hold mutex so that write thread doesn't mess with class
-        // member data in fillColumn
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getMaximumMagnitudeAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }
     return cache->getMaximumMagnitudeAt(col);
 }
@@ -1030,11 +1015,7 @@
     //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getPhaseAt: filling");
-        // hold mutex so that write thread doesn't mess with class
-        // member data in fillColumn
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getPhaseAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }
     return cache->getPhaseAt(col, y);
 }
@@ -1059,9 +1040,7 @@
     //!!! n.b. can throw
     if (!cache->haveSetColumnAt(col)) {
         Profiler profiler("FFTDataServer::getPhasesAt: filling");
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getPhasesAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }
 
     for (size_t i = 0; i < count; ++i) {
@@ -1097,11 +1076,7 @@
 #ifdef DEBUG_FFT_SERVER
         std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl;
 #endif
-        // hold mutex so that write thread doesn't mess with class
-        // member data in fillColumn
-        MutexLocker locker(&m_writeMutex,
-                           "FFTDataServer::getValuesAt: m_writeMutex");
-        fillColumn(x, true);
+        fillColumn(x, cache);
     }        
 
     cache->getValuesAt(col, y, real, imaginary);
@@ -1136,7 +1111,7 @@
 }    
 
 void
-FFTDataServer::fillColumn(size_t x, bool lockHeld)
+FFTDataServer::fillColumn(size_t x, FFTCacheReader *tester)
 {
     Profiler profiler("FFTDataServer::fillColumn", false);
 
@@ -1145,14 +1120,14 @@
                   << x << "): model not yet ready" << std::endl;
         return;
     }
-
+/*
     if (!m_fftInput) {
         std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
                   << "input has already been completed and discarded?"
                   << std::endl;
         return;
     }
-
+*/
     if (x >= m_width) {
         std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
                   << "x > width (" << x << " > " << m_width << ")"
@@ -1180,6 +1155,36 @@
     startFrame -= winsize / 2;
     endFrame   -= winsize / 2;
 
+#ifdef DEBUG_FFT_SERVER_FILL
+    std::cerr << "FFTDataServer::fillColumn: requesting frames "
+              << startFrame + pfx << " -> " << endFrame << " ( = "
+              << endFrame - (startFrame + pfx) << ") at index "
+              << off + pfx << " in buffer of size " << m_fftSize
+              << " with window size " << m_windowSize 
+              << " from channel " << m_channel << std::endl;
+#endif
+
+    QMutexLocker locker(&m_fftBuffersLock);
+
+    if (tester) {
+        // We are being called from a function that wanted to obtain a
+        // column using an FFTCacheReader.  Before calling us, it
+        // checked whether the column was available already, and the
+        // reader reported that it wasn't.  Now we test again with the
+        // mutex held, to avoid a race condition in case another
+        // thread has called fillColumn at the same time.
+        if (tester->haveSetColumnAt(x & m_cacheWidthMask)) {
+            return;
+        }
+    }
+
+    if (!m_fftInput) {
+        std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
+                  << "input has already been completed and discarded?"
+                  << std::endl;
+        return;
+    }
+
     for (int i = 0; i < off; ++i) {
         m_fftInput[i] = 0.0;
     }
@@ -1195,15 +1200,6 @@
 	}
     }
 
-#ifdef DEBUG_FFT_SERVER_FILL
-    std::cerr << "FFTDataServer::fillColumn: requesting frames "
-              << startFrame + pfx << " -> " << endFrame << " ( = "
-              << endFrame - (startFrame + pfx) << ") at index "
-              << off + pfx << " in buffer of size " << m_fftSize
-              << " with window size " << m_windowSize 
-              << " from channel " << m_channel << std::endl;
-#endif
-
     int count = 0;
     if (endFrame > startFrame + pfx) count = endFrame - (startFrame + pfx);
 
@@ -1234,9 +1230,6 @@
 
     fftf_execute(m_fftPlan);
 
-    // If our cache uses polar storage, it's more friendly for us to
-    // do the conversion before taking the write mutex
-
     float factor = 0.f;
 
     if (cache->getStorageType() == FFTCache::Compact ||
@@ -1261,24 +1254,19 @@
 
     Profiler subprof("FFTDataServer::fillColumn: set to cache");
 
-    {
-        MutexLocker locker(lockHeld ? 0 : &m_writeMutex,
-                           "FFTDataServer::fillColumn: m_writeMutex [2]");
+    if (cache->getStorageType() == FFTCache::Compact ||
+        cache->getStorageType() == FFTCache::Polar) {
+            
+        cache->setColumnAt(col,
+                           m_workbuffer,
+                           m_workbuffer + hs + 1,
+                           factor);
 
-        if (cache->getStorageType() == FFTCache::Compact ||
-            cache->getStorageType() == FFTCache::Polar) {
-            
-            cache->setColumnAt(col,
-                               m_workbuffer,
-                               m_workbuffer + hs + 1,
-                               factor);
+    } else {
 
-        } else {
-
-            cache->setColumnAt(col,
-                               m_workbuffer,
-                               m_workbuffer + hs + 1);
-        }
+        cache->setColumnAt(col,
+                           m_workbuffer,
+                           m_workbuffer + hs + 1);
     }
 
     if (m_suspended) {
@@ -1376,8 +1364,7 @@
 
         for (size_t f = m_fillFrom; f < end; f += m_server.m_windowIncrement) {
 	    
-            m_server.fillColumn(int((f - start) / m_server.m_windowIncrement),
-                                false);
+            m_server.fillColumn(int((f - start) / m_server.m_windowIncrement));
 
             if (m_server.m_exiting) return;
 
@@ -1385,10 +1372,10 @@
 #ifdef DEBUG_FFT_SERVER
                 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl;
 #endif
-                {
-                    MutexLocker locker(&m_server.m_writeMutex,
-                                       "FFTDataServer::run::m_writeMutex [1]");
-                    m_server.m_condition.wait(&m_server.m_writeMutex, 10000);
+                MutexLocker locker(&m_server.m_fftBuffersLock,
+                                   "FFTDataServer::run::m_fftBuffersLock [1]");
+                if (m_server.m_suspended && !m_server.m_exiting) {
+                    m_server.m_condition.wait(&m_server.m_fftBuffersLock, 10000);
                 }
 #ifdef DEBUG_FFT_SERVER
                 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): waited" << std::endl;
@@ -1417,8 +1404,7 @@
 
     for (size_t f = start; f < remainingEnd; f += m_server.m_windowIncrement) {
 
-        m_server.fillColumn(int((f - start) / m_server.m_windowIncrement),
-                            false);
+        m_server.fillColumn(int((f - start) / m_server.m_windowIncrement));
 
         if (m_server.m_exiting) return;
 
@@ -1427,9 +1413,11 @@
             std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl;
 #endif
             {
-                MutexLocker locker(&m_server.m_writeMutex,
-                                   "FFTDataServer::run::m_writeMutex [2]");
-                m_server.m_condition.wait(&m_server.m_writeMutex, 10000);
+                MutexLocker locker(&m_server.m_fftBuffersLock,
+                                   "FFTDataServer::run::m_fftBuffersLock [2]");
+                if (m_server.m_suspended && !m_server.m_exiting) {
+                    m_server.m_condition.wait(&m_server.m_fftBuffersLock, 10000);
+                }
             }
             if (m_server.m_exiting) return;
         }