Mercurial > hg > svcore
changeset 455:3e0f1f7bec85
* Fix a nasty and long-standing race condition in MatrixFile's use of
FileReadThread that was causing crashes sometimes
author | Chris Cannam |
---|---|
date | Thu, 09 Oct 2008 20:10:28 +0000 |
parents | ba7aaacb7211 |
children | 64e64e304a12 |
files | data/fft/FFTDataServer.cpp data/fft/FFTFileCache.cpp data/fileio/FileReadThread.cpp data/fileio/FileReadThread.h data/fileio/MatrixFile.cpp data/fileio/MatrixFile.h |
diffstat | 6 files changed, 84 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/data/fft/FFTDataServer.cpp Thu Oct 09 13:13:33 2008 +0000 +++ b/data/fft/FFTDataServer.cpp Thu Oct 09 20:10:28 2008 +0000 @@ -27,7 +27,7 @@ #include "base/Profiler.h" #include "base/Thread.h" // for debug mutex locker -#define DEBUG_FFT_SERVER 1 +//#define DEBUG_FFT_SERVER 1 //#define DEBUG_FFT_SERVER_FILL 1 #ifdef DEBUG_FFT_SERVER_FILL
--- a/data/fft/FFTFileCache.cpp Thu Oct 09 13:13:33 2008 +0000 +++ b/data/fft/FFTFileCache.cpp Thu Oct 09 20:10:28 2008 +0000 @@ -19,6 +19,7 @@ #include "base/Profiler.h" #include "base/Thread.h" +#include "base/Exceptions.h" #include <iostream> @@ -308,13 +309,19 @@ if (!m_readbuf) { m_readbuf = new char[m_mfc->getHeight() * 2 * m_mfc->getCellSize()]; } - m_mfc->getColumnAt(x, m_readbuf); - if (m_mfc->haveSetColumnAt(x + 1)) { - m_mfc->getColumnAt - (x + 1, m_readbuf + m_mfc->getCellSize() * m_mfc->getHeight()); - m_readbufWidth = 2; - } else { - m_readbufWidth = 1; + try { + m_mfc->getColumnAt(x, m_readbuf); + if (m_mfc->haveSetColumnAt(x + 1)) { + m_mfc->getColumnAt + (x + 1, m_readbuf + m_mfc->getCellSize() * m_mfc->getHeight()); + m_readbufWidth = 2; + } else { + m_readbufWidth = 1; + } + } catch (FileReadFailed f) { + std::cerr << "ERROR: FFTFileCache::populateReadBuf: File read failed: " + << f.what() << std::endl; + memset(m_readbuf, 0, m_mfc->getHeight() * 2 * m_mfc->getCellSize()); } m_readbufCol = x; }
--- a/data/fileio/FileReadThread.cpp Thu Oct 09 13:13:33 2008 +0000 +++ b/data/fileio/FileReadThread.cpp Thu Oct 09 20:10:28 2008 +0000 @@ -21,7 +21,7 @@ #include <iostream> #include <unistd.h> -#define DEBUG_FILE_READ_THREAD 1 +//#define DEBUG_FILE_READ_THREAD 1 FileReadThread::FileReadThread() : m_nextToken(0), @@ -141,6 +141,24 @@ } bool +FileReadThread::haveRequest(int token) +{ + MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex"); + + bool found = false; + + if (m_queue.find(token) != m_queue.end()) { + found = true; + } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { + found = true; + } else if (m_readyRequests.find(token) != m_readyRequests.end()) { + found = true; + } + + return found; +} + +bool FileReadThread::getRequest(int token, Request &request) { MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex"); @@ -244,6 +262,9 @@ } else { if (r < 0) { ::perror("ERROR: FileReadThread::process: Read failed"); + std::cerr << "ERROR: FileReadThread::process: read of " + << request.size << " at " + << request.start << " failed" << std::endl; request.size = 0; } else if (r < ssize_t(request.size)) { std::cerr << "WARNING: FileReadThread::process: read " @@ -267,11 +288,15 @@ m_queue.erase(token); m_readyRequests[token] = request; #ifdef DEBUG_FILE_READ_THREAD - std::cerr << "FileReadThread::process: done, marking as ready" << std::endl; + std::cerr << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << std::endl; #endif } else { #ifdef DEBUG_FILE_READ_THREAD - std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl; + if (m_exiting) { + std::cerr << "FileReadThread::process: exiting" << std::endl; + } else { + std::cerr << "FileReadThread::process: request disappeared" << std::endl; + } #endif } }
--- a/data/fileio/FileReadThread.h Thu Oct 09 13:13:33 2008 +0000 +++ b/data/fileio/FileReadThread.h Thu Oct 09 20:10:28 2008 +0000 @@ -50,6 +50,7 @@ virtual bool isReady(int token); virtual bool isCancelled(int token); // and safe to delete + virtual bool haveRequest(int token); virtual bool getRequest(int token, Request &request); virtual void done(int token);
--- a/data/fileio/MatrixFile.cpp Thu Oct 09 13:13:33 2008 +0000 +++ b/data/fileio/MatrixFile.cpp Thu Oct 09 20:10:28 2008 +0000 @@ -35,7 +35,7 @@ #include <QFileInfo> #include <QDir> -#define DEBUG_MATRIX_FILE 1 +//#define DEBUG_MATRIX_FILE 1 //#define DEBUG_MATRIX_FILE_READ_SET 1 #ifdef DEBUG_MATRIX_FILE_READ_SET @@ -521,8 +521,6 @@ #endif } -static int alloc = 0; - void MatrixFile::primeCache(size_t x, bool goingLeft) { @@ -565,16 +563,30 @@ MutexLocker locker(&m_cacheMutex, "MatrixFile::primeCache::m_cacheMutex"); - FileReadThread::Request request; + // Check for the existence of the request first; if it exists, + // check whether it's ready. Only when we know it's ready do we + // retrieve the actual request, because the reason we need the + // request is to check whether it was successful or not and + // extract data from it, and none of that can be relied upon if we + // retrieve the request before it's ready. (There used to be a + // race condition here, where we retrieved the request and only + // afterwards checked the ready status, pulling data from the + // request if it was found to be ready then.) if (m_requestToken >= 0 && - m_readThread->getRequest(m_requestToken, request)) { + m_readThread->haveRequest(m_requestToken)) { if (x >= m_requestingX && x < m_requestingX + m_requestingWidth) { if (m_readThread->isReady(m_requestToken)) { + FileReadThread::Request request; + if (!m_readThread->getRequest(m_requestToken, request)) { + std::cerr << "ERROR: MatrixFile::primeCache: File read thread has lost our request!" << std::endl; + throw FileReadFailed(m_fileName); + } + if (!request.successful) { std::cerr << "ERROR: MatrixFile::primeCache: Last request was unsuccessful" << std::endl; throw FileReadFailed(m_fileName); @@ -610,25 +622,32 @@ return; } - // the current request is no longer of any use - m_readThread->cancel(m_requestToken); + FileReadThread::Request dud; - // crude way to avoid leaking the data - while (!m_readThread->isCancelled(m_requestToken)) { - usleep(10000); - } + if (!m_readThread->getRequest(m_requestToken, dud)) { + + std::cerr << "ERROR: MatrixFile::primeCache: Inconsistent replies from FileReadThread" << std::endl; + + } else { + + // current request is for the wrong area, so no longer of any use + m_readThread->cancel(m_requestToken); + + // crude way to avoid leaking the data + while (!m_readThread->isCancelled(m_requestToken)) { + usleep(10000); + } #ifdef DEBUG_MATRIX_FILE_READ_SET - std::cerr << "cancelled " << m_requestToken << std::endl; + std::cerr << "cancelled " << m_requestToken << std::endl; #endif - if (m_spareData) { -// std::cerr << this << ": Freeing spare data" << std::endl; - free(m_spareData); + if (m_spareData) { + free(m_spareData); + } + m_spareData = dud.data; + m_readThread->done(m_requestToken); } -// std::cerr << this << ": Moving request data to spare" << std::endl; - m_spareData = request.data; - m_readThread->done(m_requestToken); m_requestToken = -1; } @@ -638,6 +657,8 @@ if (m_fd < 0) resume(); } + FileReadThread::Request request; + request.fd = m_fd; request.mutex = &m_fdMutex; request.start = m_headerSize + rx * m_height * m_cellSize;
--- a/data/fileio/MatrixFile.h Thu Oct 09 13:13:33 2008 +0000 +++ b/data/fileio/MatrixFile.h Thu Oct 09 20:10:28 2008 +0000 @@ -68,7 +68,7 @@ void reset(); bool haveSetColumnAt(size_t x) const { return m_columnBitset->get(x); } - void getColumnAt(size_t x, void *data); + void getColumnAt(size_t x, void *data); // may throw FileReadFailed void setColumnAt(size_t x, const void *data); void suspend();