view data/fileio/MatrixFile.cpp @ 458:f60360209e5c

* Fix race condition in FFTFileCache when reading from the same FFT model from multiple threads (e.g. when applying more than one plugin at once)
author Chris Cannam
date Wed, 15 Oct 2008 12:08:02 +0000
parents 3e0f1f7bec85
children 3cc4b7cd2aa5
line wrap: on
line source
/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*-  vi:set ts=8 sts=4 sw=4: */

/*
    Sonic Visualiser
    An audio file viewer and annotation editor.
    Centre for Digital Music, Queen Mary, University of London.
    This file copyright 2006 Chris Cannam.
    
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
    published by the Free Software Foundation; either version 2 of the
    License, or (at your option) any later version.  See the file
    COPYING included with this distribution for more information.
*/

#include "MatrixFile.h"
#include "base/TempDirectory.h"
#include "system/System.h"
#include "base/Profiler.h"
#include "base/Exceptions.h"
#include "base/Thread.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include <iostream>

#include <cstdio>
#include <cassert>

#include <cstdlib>

#include <QFileInfo>
#include <QDir>

//#define DEBUG_MATRIX_FILE 1
//#define DEBUG_MATRIX_FILE_READ_SET 1

#ifdef DEBUG_MATRIX_FILE_READ_SET
#ifndef DEBUG_MATRIX_FILE
#define DEBUG_MATRIX_FILE 1
#endif
#endif

std::map<QString, int> MatrixFile::m_refcount;
QMutex MatrixFile::m_refcountMutex;

MatrixFile::ResizeableBitsetMap MatrixFile::m_columnBitsets;
QMutex MatrixFile::m_columnBitsetWriteMutex;

FileReadThread *MatrixFile::m_readThread = 0;

static size_t totalStorage = 0;
static size_t totalMemory = 0;
static size_t totalCount = 0;

MatrixFile::MatrixFile(QString fileBase, Mode mode,
                       size_t cellSize, bool eagerCache) :
    m_fd(-1),
    m_mode(mode),
    m_flags(0),
    m_fmode(0),
    m_cellSize(cellSize),
    m_width(0),
    m_height(0),
    m_headerSize(2 * sizeof(size_t)),
    m_defaultCacheWidth(1024),
    m_prevX(0),
    m_eagerCache(eagerCache),
    m_requestToken(-1),
    m_spareData(0),
    m_columnBitset(0)
{
    Profiler profiler("MatrixFile::MatrixFile", true);

    if (!m_readThread) {
        m_readThread = new FileReadThread;
        m_readThread->start();
    }

    m_cache.data = 0;

    QDir tempDir(TempDirectory::getInstance()->getPath());
    QString fileName(tempDir.filePath(QString("%1.mfc").arg(fileBase)));
    bool newFile = !QFileInfo(fileName).exists();

    if (newFile && m_mode == ReadOnly) {
        std::cerr << "ERROR: MatrixFile::MatrixFile: Read-only mode "
                  << "specified, but cache file does not exist" << std::endl;
        throw FileNotFound(fileName);
    }

    if (!newFile && m_mode == ReadWrite) {
        std::cerr << "Note: MatrixFile::MatrixFile: Read/write mode "
                  << "specified, but file already exists; falling back to "
                  << "read-only mode" << std::endl;
        m_mode = ReadOnly;
    }

    if (!eagerCache && m_mode == ReadOnly) {
        std::cerr << "WARNING: MatrixFile::MatrixFile: Eager cacheing not "
                  << "specified, but file is open in read-only mode -- cache "
                  << "will not be used" << std::endl;
    }

    m_flags = 0;
    m_fmode = S_IRUSR | S_IWUSR;

    if (m_mode == ReadWrite) {
        m_flags = O_RDWR | O_CREAT;
    } else {
        m_flags = O_RDONLY;
    }

#ifdef _WIN32
    m_flags |= O_BINARY;
#endif

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::MatrixFile: opening " << fileName.toStdString() << "..." << std::endl;
#endif

    if ((m_fd = ::open(fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
        ::perror("Open failed");
        std::cerr << "ERROR: MatrixFile::MatrixFile: "
                  << "Failed to open cache file \""
                  << fileName.toStdString() << "\"";
        if (m_mode == ReadWrite) std::cerr << " for writing";
        std::cerr << std::endl;
        throw FailedToOpenFile(fileName);
    }

    if (newFile) {
        resize(0, 0); // write header
    } else {
        size_t header[2];
        if (::read(m_fd, header, 2 * sizeof(size_t)) < 0) {
            ::perror("MatrixFile::MatrixFile: read failed");
            std::cerr << "ERROR: MatrixFile::MatrixFile: "
                      << "Failed to read header (fd " << m_fd << ", file \""
                      << fileName.toStdString() << "\")" << std::endl;
            throw FileReadFailed(fileName);
        }
        m_width = header[0];
        m_height = header[1];
        seekTo(0, 0);
    }

    m_fileName = fileName;

    {
        MutexLocker locker
            (&m_columnBitsetWriteMutex,
             "MatrixFile::MatrixFile::m_columnBitsetWriteMutex");

        if (m_columnBitsets.find(m_fileName) == m_columnBitsets.end()) {
            m_columnBitsets[m_fileName] = new ResizeableBitset;
        }
        m_columnBitset = m_columnBitsets[m_fileName];
    }

    MutexLocker locker(&m_refcountMutex,
                       "MatrixFile::MatrixFile::m_refcountMutex");
    ++m_refcount[fileName];

//    std::cerr << "MatrixFile(" << this << "): fd " << m_fd << ", file " << fileName.toStdString() << ", ref " << m_refcount[fileName] << std::endl;

//    std::cerr << "MatrixFile::MatrixFile: Done, size is " << "(" << m_width << ", " << m_height << ")" << std::endl;

    ++totalCount;

}

MatrixFile::~MatrixFile()
{
    char *requestData = 0;

    if (m_requestToken >= 0) {
        FileReadThread::Request request;
        if (m_readThread->getRequest(m_requestToken, request)) {
            requestData = request.data;
        }
        m_readThread->cancel(m_requestToken);
    }

    if (requestData) free(requestData);
    if (m_cache.data) free(m_cache.data);
    if (m_spareData) free(m_spareData);

    if (m_fd >= 0) {
        if (::close(m_fd) < 0) {
            ::perror("MatrixFile::~MatrixFile: close failed");
        }
    }

    if (m_fileName != "") {

        MutexLocker locker(&m_refcountMutex,
                           "MatrixFile::~MatrixFile::m_refcountMutex");

        if (--m_refcount[m_fileName] == 0) {

            if (::unlink(m_fileName.toLocal8Bit())) {
//                ::perror("Unlink failed");
//                std::cerr << "WARNING: MatrixFile::~MatrixFile: reference count reached 0, but failed to unlink file \"" << m_fileName.toStdString() << "\"" << std::endl;
            } else {
//                std::cerr << "deleted " << m_fileName.toStdString() << std::endl;
            }

            MutexLocker locker2
                (&m_columnBitsetWriteMutex,
                 "MatrixFile::~MatrixFile::m_columnBitsetWriteMutex");
            m_columnBitsets.erase(m_fileName);
            delete m_columnBitset;
        }
    }
    
    totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize));
    totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize);
    totalCount --;

//    std::cerr << "MatrixFile::~MatrixFile: " << std::endl;
//    std::cerr << "Total storage now " << totalStorage/1024 << "K, theoretical max memory "
//              << totalMemory/1024 << "K in " << totalCount << " instances" << std::endl;

}

void
MatrixFile::resize(size_t w, size_t h)
{
    Profiler profiler("MatrixFile::resize", true);

    assert(m_mode == ReadWrite);

    MutexLocker locker(&m_fdMutex, "MatrixFile::resize::m_fdMutex");
    
    totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize));
    totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize);

    off_t off = m_headerSize + (w * h * m_cellSize);

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::resize(" << w << ", " << h << "): resizing file" << std::endl;
#endif

    if (w * h < m_width * m_height) {
        if (::ftruncate(m_fd, off) < 0) {
            ::perror("WARNING: MatrixFile::resize: ftruncate failed");
            throw FileOperationFailed(m_fileName, "ftruncate");
        }
    }

    m_width = 0;
    m_height = 0;

    if (::lseek(m_fd, 0, SEEK_SET) == (off_t)-1) {
        ::perror("ERROR: MatrixFile::resize: Seek to write header failed");
        throw FileOperationFailed(m_fileName, "lseek");
    }

    size_t header[2];
    header[0] = w;
    header[1] = h;
    if (::write(m_fd, header, 2 * sizeof(size_t)) != 2 * sizeof(size_t)) {
        ::perror("ERROR: MatrixFile::resize: Failed to write header");
        throw FileOperationFailed(m_fileName, "write");
    }

    if (w > 0 && m_defaultCacheWidth > w) {
        m_defaultCacheWidth = w;
    }

//!!!    static size_t maxCacheMB = 16;
    static size_t maxCacheMB = 4;
    if (2 * m_defaultCacheWidth * h * m_cellSize > maxCacheMB * 1024 * 1024) { //!!!
        m_defaultCacheWidth = (maxCacheMB * 1024 * 1024) / (2 * h * m_cellSize);
        if (m_defaultCacheWidth < 16) m_defaultCacheWidth = 16;
    }

    if (m_columnBitset) {
        MutexLocker locker(&m_columnBitsetWriteMutex,
                           "MatrixFile::resize::m_columnBitsetWriteMutex");
        m_columnBitset->resize(w);
    }

    if (m_cache.data) {
        free(m_cache.data);
        m_cache.data = 0;
    }

    if (m_spareData) {
        free(m_spareData);
        m_spareData = 0;
    }
    
    m_width = w;
    m_height = h;

    totalStorage += (m_headerSize + (m_width * m_height * m_cellSize));
    totalMemory += (2 * m_defaultCacheWidth * m_height * m_cellSize);

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::resize(" << w << ", " << h << "): cache width "
              << m_defaultCacheWidth << ", storage "
              << (m_headerSize + w * h * m_cellSize) << ", mem "
              << (2 * h * m_defaultCacheWidth * m_cellSize) << std::endl;

    std::cerr << "Total storage " << totalStorage/1024 << "K, theoretical max memory "
              << totalMemory/1024 << "K in " << totalCount << " instances" << std::endl;
#endif

    seekTo(0, 0);
}

void
MatrixFile::reset()
{
    Profiler profiler("MatrixFile::reset", true);

    assert (m_mode == ReadWrite);
    
    if (m_eagerCache) {
        void *emptyCol = calloc(m_height, m_cellSize);
        for (size_t x = 0; x < m_width; ++x) setColumnAt(x, emptyCol);
        free(emptyCol);
    }
    
    if (m_columnBitset) {
        MutexLocker locker(&m_columnBitsetWriteMutex,
                           "MatrixFile::reset::m_columnBitsetWriteMutex");
        m_columnBitset->resize(m_width);
    }
}

void
MatrixFile::getColumnAt(size_t x, void *data)
{
    Profiler profiler("MatrixFile::getColumnAt");

//    assert(haveSetColumnAt(x));

    if (getFromCache(x, 0, m_height, data)) return;

    Profiler profiler2("MatrixFile::getColumnAt (uncached)");

    ssize_t r = 0;

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::getColumnAt(" << x << ")"
              << ": reading the slow way";

    if (m_requestToken >= 0 &&
        x >= m_requestingX &&
        x <  m_requestingX + m_requestingWidth) {
        
        std::cerr << " (awaiting " << m_requestingX << ", " << m_requestingWidth << " from disk)";
    }

    std::cerr << std::endl;
#endif

    {
        MutexLocker locker(&m_fdMutex, "MatrixFile::getColumnAt::m_fdMutex");

        if (seekTo(x, 0)) {
            r = ::read(m_fd, data, m_height * m_cellSize);
        }
    }
    
    if (r < 0) {
        ::perror("MatrixFile::getColumnAt: read failed");
        std::cerr << "ERROR: MatrixFile::getColumnAt: "
                  << "Failed to read column " << x << " (height " << m_height << ", cell size " << m_cellSize << ", fd " << m_fd << ", file \""
                  << m_fileName.toStdString() << "\")" << std::endl;
        throw FileReadFailed(m_fileName);
    }

    return;
}

bool
MatrixFile::getFromCache(size_t x, size_t ystart, size_t ycount, void *data)
{
    bool fail = false;
    bool primeLeft = false;

    {
        MutexLocker locker(&m_cacheMutex,
                           "MatrixFile::getFromCache::m_cacheMutex");

        if (!m_cache.data || x < m_cache.x || x >= m_cache.x + m_cache.width) {
            fail = true;
            primeLeft = (m_cache.data && x < m_cache.x);
        } else {
            memcpy(data,
                   m_cache.data + m_cellSize * ((x - m_cache.x) * m_height + ystart),
                   ycount * m_cellSize);
        }            
    }

    if (fail) {
        primeCache(x, primeLeft); // this doesn't take effect until a later callback
        m_prevX = x;
        return false;
    }

    if (m_cache.x > 0 && x < m_prevX && x < m_cache.x + m_cache.width/4) {
        primeCache(x, true);
    }

    if (m_cache.x + m_cache.width < m_width &&
        x > m_prevX &&
        x > m_cache.x + (m_cache.width * 3) / 4) {
        primeCache(x, false);
    }

    m_prevX = x;
    return true;
}

void
MatrixFile::setColumnAt(size_t x, const void *data)
{
    assert(m_mode == ReadWrite);

#ifdef DEBUG_MATRIX_FILE_READ_SET
//    std::cerr << "MatrixFile::setColumnAt(" << x << ")" << std::endl;
    std::cerr << ".";
#endif

    ssize_t w = 0;
    bool seekFailed = false;
    
    {
        MutexLocker locker(&m_fdMutex, "MatrixFile::setColumnAt::m_fdMutex");

        if (seekTo(x, 0)) {
            w = ::write(m_fd, data, m_height * m_cellSize);
        } else {
            seekFailed = true;
        }
    }

    if (!seekFailed && w != ssize_t(m_height * m_cellSize)) {
        ::perror("WARNING: MatrixFile::setColumnAt: write failed");
        throw FileOperationFailed(m_fileName, "write");
    } else if (seekFailed) {
        throw FileOperationFailed(m_fileName, "seek");
    } else {
        MutexLocker locker
            (&m_columnBitsetWriteMutex,
             "MatrixFile::setColumnAt::m_columnBitsetWriteMutex");
        m_columnBitset->set(x);
    }
}

void
MatrixFile::suspend()
{
    MutexLocker locker(&m_fdMutex, "MatrixFile::suspend::m_fdMutex");
    MutexLocker locker2(&m_cacheMutex, "MatrixFile::suspend::m_cacheMutex");

    if (m_fd < 0) return; // already suspended

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::suspend(): fd was " << m_fd << std::endl;
#endif

    if (m_requestToken >= 0) {
        void *data = 0;
        FileReadThread::Request request;
        if (m_readThread->getRequest(m_requestToken, request)) {
            data = request.data;
        }
        m_readThread->cancel(m_requestToken);
        if (data) free(data);
        m_requestToken = -1;
    }

    if (m_cache.data) {
        free(m_cache.data);
        m_cache.data = 0;
    }

    if (m_spareData) {
        free(m_spareData);
        m_spareData = 0;
    }
    
    if (::close(m_fd) < 0) {
        ::perror("WARNING: MatrixFile::suspend: close failed");
        throw FileOperationFailed(m_fileName, "close");
    }

    m_fd = -1;
}

void
MatrixFile::resume()
{
    if (m_fd >= 0) return;

#ifdef DEBUG_MATRIX_FILE    
    std::cerr << "MatrixFile(" << this << ")::resume()" << std::endl;
#endif

    if ((m_fd = ::open(m_fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
        ::perror("Open failed");
        std::cerr << "ERROR: MatrixFile::resume: "
                  << "Failed to open cache file \""
                  << m_fileName.toStdString() << "\"";
        if (m_mode == ReadWrite) std::cerr << " for writing";
        std::cerr << std::endl;
        throw FailedToOpenFile(m_fileName);
    }

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::resume(): fd is " << m_fd << std::endl;
#endif
}

void
MatrixFile::primeCache(size_t x, bool goingLeft)
{
//    Profiler profiler("MatrixFile::primeCache");

#ifdef DEBUG_MATRIX_FILE_READ_SET
    std::cerr << "MatrixFile::primeCache(" << x << ", " << goingLeft << ")" << std::endl;
#endif

    size_t rx = x;
    size_t rw = m_defaultCacheWidth;

    size_t left = rw / 3;
    if (goingLeft) left = (rw * 2) / 3;

    if (rx > left) rx -= left;
    else rx = 0;

    if (rx + rw > m_width) rw = m_width - rx;

    if (!m_eagerCache) {

        size_t ti = 0;

        for (ti = 0; ti < rw; ++ti) {
            if (!m_columnBitset->get(rx + ti)) break;
        }
        
#ifdef DEBUG_MATRIX_FILE
        if (ti < rw) {
            std::cerr << "eagerCache is false and there's a hole at "
                      << rx + ti << ", reducing rw from " << rw << " to "
                      << ti << std::endl;
        }
#endif

        rw = std::min(rw, ti);
        if (rw < 10 || rx + rw <= x) return;
    }

    MutexLocker locker(&m_cacheMutex, "MatrixFile::primeCache::m_cacheMutex");

    // Check for the existence of the request first; if it exists,
    // check whether it's ready.  Only when we know it's ready do we
    // retrieve the actual request, because the reason we need the
    // request is to check whether it was successful or not and
    // extract data from it, and none of that can be relied upon if we
    // retrieve the request before it's ready.  (There used to be a
    // race condition here, where we retrieved the request and only
    // afterwards checked the ready status, pulling data from the
    // request if it was found to be ready then.)

    if (m_requestToken >= 0 &&
        m_readThread->haveRequest(m_requestToken)) {

        if (x >= m_requestingX &&
            x <  m_requestingX + m_requestingWidth) {

            if (m_readThread->isReady(m_requestToken)) {

                FileReadThread::Request request;
                if (!m_readThread->getRequest(m_requestToken, request)) {
                    std::cerr << "ERROR: MatrixFile::primeCache: File read thread has lost our request!" << std::endl;
                    throw FileReadFailed(m_fileName);
                }

                if (!request.successful) {
                    std::cerr << "ERROR: MatrixFile::primeCache: Last request was unsuccessful" << std::endl;
                    throw FileReadFailed(m_fileName);
                }
                
#ifdef DEBUG_MATRIX_FILE_READ_SET
                std::cerr << "last request is ready! (" << m_requestingX << ", "<< m_requestingWidth << ")"  << std::endl;
#endif

                m_cache.x = (request.start - m_headerSize) / (m_height * m_cellSize);
                m_cache.width = request.size / (m_height * m_cellSize);
      
#ifdef DEBUG_MATRIX_FILE_READ_SET
                std::cerr << "received last request: actual size is: " << m_cache.x << ", " << m_cache.width << std::endl;
#endif

                if (m_cache.data) {
                    if (m_spareData) {
//                        std::cerr << this << ": Freeing spare data" << std::endl;
                        free(m_spareData);
                    }
//                    std::cerr << this << ": Moving old cache data to spare" << std::endl;
                    m_spareData = m_cache.data;
                }
//                std::cerr << this << ": Moving request data to cache" << std::endl;
                m_cache.data = request.data;

                m_readThread->done(m_requestToken);
                m_requestToken = -1;
            }

            // already requested something covering this area; wait for it
            return;
        }

        FileReadThread::Request dud;

        if (!m_readThread->getRequest(m_requestToken, dud)) {

            std::cerr << "ERROR: MatrixFile::primeCache: Inconsistent replies from FileReadThread" << std::endl;

        } else {

            // current request is for the wrong area, so no longer of any use
            m_readThread->cancel(m_requestToken);

            // crude way to avoid leaking the data
            while (!m_readThread->isCancelled(m_requestToken)) {
                usleep(10000);
            }

#ifdef DEBUG_MATRIX_FILE_READ_SET
            std::cerr << "cancelled " << m_requestToken << std::endl;
#endif

            if (m_spareData) {
                free(m_spareData);
            }
            m_spareData = dud.data;
            m_readThread->done(m_requestToken);
        }

        m_requestToken = -1;
    }

    if (m_fd < 0) {
        MutexLocker locker(&m_fdMutex, "MatrixFile::primeCache::m_fdMutex");
        if (m_fd < 0) resume();
    }

    FileReadThread::Request request;

    request.fd = m_fd;
    request.mutex = &m_fdMutex;
    request.start = m_headerSize + rx * m_height * m_cellSize;
    request.size = rw * m_height * m_cellSize;

//    std::cerr << this << ": Moving spare data to request, and resizing to " << rw * m_height * m_cellSize << std::endl;

    request.data = (char *)realloc(m_spareData, rw * m_height * m_cellSize);
    MUNLOCK(request.data, rw * m_height * m_cellSize);
    m_spareData = 0;

    m_requestingX = rx;
    m_requestingWidth = rw;

    int token = m_readThread->request(request);
#ifdef DEBUG_MATRIX_FILE_READ_SET
    std::cerr << "MatrixFile::primeCache: request token is "
              << token << " (x = [" << rx << "], w = [" << rw << "], left = [" << goingLeft << "])" << std::endl;
#endif
    m_requestToken = token;
}

bool
MatrixFile::seekTo(size_t x, size_t y)
{
    if (m_fd < 0) resume();

    off_t off = m_headerSize + (x * m_height + y) * m_cellSize;

    if (::lseek(m_fd, off, SEEK_SET) == (off_t)-1) {
        ::perror("Seek failed");
        std::cerr << "ERROR: MatrixFile::seekTo(" << x << ", " << y
                  << ") failed" << std::endl;
        return false;
    }

    return true;
}