view data/fileio/MatrixFile.cpp @ 527:3c5570e3d9c5

* Fix crash in SimpleSPARQLQuery * Fix failure to add images from local files
author Chris Cannam
date Thu, 15 Jan 2009 18:20:40 +0000
parents 3e0f1f7bec85
children 3cc4b7cd2aa5
line wrap: on
line source
/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*-  vi:set ts=8 sts=4 sw=4: */

/*
    Sonic Visualiser
    An audio file viewer and annotation editor.
    Centre for Digital Music, Queen Mary, University of London.
    This file copyright 2006 Chris Cannam.
    
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
    published by the Free Software Foundation; either version 2 of the
    License, or (at your option) any later version.  See the file
    COPYING included with this distribution for more information.
*/

#include "MatrixFile.h"
#include "base/TempDirectory.h"
#include "system/System.h"
#include "base/Profiler.h"
#include "base/Exceptions.h"
#include "base/Thread.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

#include <iostream>

#include <cstdio>
#include <cassert>

#include <cstdlib>

#include <QFileInfo>
#include <QDir>

//#define DEBUG_MATRIX_FILE 1
//#define DEBUG_MATRIX_FILE_READ_SET 1

#ifdef DEBUG_MATRIX_FILE_READ_SET
#ifndef DEBUG_MATRIX_FILE
#define DEBUG_MATRIX_FILE 1
#endif
#endif

std::map<QString, int> MatrixFile::m_refcount;
QMutex MatrixFile::m_refcountMutex;

MatrixFile::ResizeableBitsetMap MatrixFile::m_columnBitsets;
QMutex MatrixFile::m_columnBitsetWriteMutex;

FileReadThread *MatrixFile::m_readThread = 0;

static size_t totalStorage = 0;
static size_t totalMemory = 0;
static size_t totalCount = 0;

MatrixFile::MatrixFile(QString fileBase, Mode mode,
                       size_t cellSize, bool eagerCache) :
    m_fd(-1),
    m_mode(mode),
    m_flags(0),
    m_fmode(0),
    m_cellSize(cellSize),
    m_width(0),
    m_height(0),
    m_headerSize(2 * sizeof(size_t)),
    m_defaultCacheWidth(1024),
    m_prevX(0),
    m_eagerCache(eagerCache),
    m_requestToken(-1),
    m_spareData(0),
    m_columnBitset(0)
{
    Profiler profiler("MatrixFile::MatrixFile", true);

    if (!m_readThread) {
        m_readThread = new FileReadThread;
        m_readThread->start();
    }

    m_cache.data = 0;

    QDir tempDir(TempDirectory::getInstance()->getPath());
    QString fileName(tempDir.filePath(QString("%1.mfc").arg(fileBase)));
    bool newFile = !QFileInfo(fileName).exists();

    if (newFile && m_mode == ReadOnly) {
        std::cerr << "ERROR: MatrixFile::MatrixFile: Read-only mode "
                  << "specified, but cache file does not exist" << std::endl;
        throw FileNotFound(fileName);
    }

    if (!newFile && m_mode == ReadWrite) {
        std::cerr << "Note: MatrixFile::MatrixFile: Read/write mode "
                  << "specified, but file already exists; falling back to "
                  << "read-only mode" << std::endl;
        m_mode = ReadOnly;
    }

    if (!eagerCache && m_mode == ReadOnly) {
        std::cerr << "WARNING: MatrixFile::MatrixFile: Eager cacheing not "
                  << "specified, but file is open in read-only mode -- cache "
                  << "will not be used" << std::endl;
    }

    m_flags = 0;
    m_fmode = S_IRUSR | S_IWUSR;

    if (m_mode == ReadWrite) {
        m_flags = O_RDWR | O_CREAT;
    } else {
        m_flags = O_RDONLY;
    }

#ifdef _WIN32
    m_flags |= O_BINARY;
#endif

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::MatrixFile: opening " << fileName.toStdString() << "..." << std::endl;
#endif

    if ((m_fd = ::open(fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
        ::perror("Open failed");
        std::cerr << "ERROR: MatrixFile::MatrixFile: "
                  << "Failed to open cache file \""
                  << fileName.toStdString() << "\"";
        if (m_mode == ReadWrite) std::cerr << " for writing";
        std::cerr << std::endl;
        throw FailedToOpenFile(fileName);
    }

    if (newFile) {
        resize(0, 0); // write header
    } else {
        size_t header[2];
        if (::read(m_fd, header, 2 * sizeof(size_t)) < 0) {
            ::perror("MatrixFile::MatrixFile: read failed");
            std::cerr << "ERROR: MatrixFile::MatrixFile: "
                      << "Failed to read header (fd " << m_fd << ", file \""
                      << fileName.toStdString() << "\")" << std::endl;
            throw FileReadFailed(fileName);
        }
        m_width = header[0];
        m_height = header[1];
        seekTo(0, 0);
    }

    m_fileName = fileName;

    {
        MutexLocker locker
            (&m_columnBitsetWriteMutex,
             "MatrixFile::MatrixFile::m_columnBitsetWriteMutex");

        if (m_columnBitsets.find(m_fileName) == m_columnBitsets.end()) {
            m_columnBitsets[m_fileName] = new ResizeableBitset;
        }
        m_columnBitset = m_columnBitsets[m_fileName];
    }

    MutexLocker locker(&m_refcountMutex,
                       "MatrixFile::MatrixFile::m_refcountMutex");
    ++m_refcount[fileName];

//    std::cerr << "MatrixFile(" << this << "): fd " << m_fd << ", file " << fileName.toStdString() << ", ref " << m_refcount[fileName] << std::endl;

//    std::cerr << "MatrixFile::MatrixFile: Done, size is " << "(" << m_width << ", " << m_height << ")" << std::endl;

    ++totalCount;

}

MatrixFile::~MatrixFile()
{
    char *requestData = 0;

    if (m_requestToken >= 0) {
        FileReadThread::Request request;
        if (m_readThread->getRequest(m_requestToken, request)) {
            requestData = request.data;
        }
        m_readThread->cancel(m_requestToken);
    }

    if (requestData) free(requestData);
    if (m_cache.data) free(m_cache.data);
    if (m_spareData) free(m_spareData);

    if (m_fd >= 0) {
        if (::close(m_fd) < 0) {
            ::perror("MatrixFile::~MatrixFile: close failed");
        }
    }

    if (m_fileName != "") {

        MutexLocker locker(&m_refcountMutex,
                           "MatrixFile::~MatrixFile::m_refcountMutex");

        if (--m_refcount[m_fileName] == 0) {

            if (::unlink(m_fileName.toLocal8Bit())) {
//                ::perror("Unlink failed");
//                std::cerr << "WARNING: MatrixFile::~MatrixFile: reference count reached 0, but failed to unlink file \"" << m_fileName.toStdString() << "\"" << std::endl;
            } else {
//                std::cerr << "deleted " << m_fileName.toStdString() << std::endl;
            }

            MutexLocker locker2
                (&m_columnBitsetWriteMutex,
                 "MatrixFile::~MatrixFile::m_columnBitsetWriteMutex");
            m_columnBitsets.erase(m_fileName);
            delete m_columnBitset;
        }
    }
    
    totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize));
    totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize);
    totalCount --;

//    std::cerr << "MatrixFile::~MatrixFile: " << std::endl;
//    std::cerr << "Total storage now " << totalStorage/1024 << "K, theoretical max memory "
//              << totalMemory/1024 << "K in " << totalCount << " instances" << std::endl;

}

void
MatrixFile::resize(size_t w, size_t h)
{
    Profiler profiler("MatrixFile::resize", true);

    assert(m_mode == ReadWrite);

    MutexLocker locker(&m_fdMutex, "MatrixFile::resize::m_fdMutex");
    
    totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize));
    totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize);

    off_t off = m_headerSize + (w * h * m_cellSize);

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::resize(" << w << ", " << h << "): resizing file" << std::endl;
#endif

    if (w * h < m_width * m_height) {
        if (::ftruncate(m_fd, off) < 0) {
            ::perror("WARNING: MatrixFile::resize: ftruncate failed");
            throw FileOperationFailed(m_fileName, "ftruncate");
        }
    }

    m_width = 0;
    m_height = 0;

    if (::lseek(m_fd, 0, SEEK_SET) == (off_t)-1) {
        ::perror("ERROR: MatrixFile::resize: Seek to write header failed");
        throw FileOperationFailed(m_fileName, "lseek");
    }

    size_t header[2];
    header[0] = w;
    header[1] = h;
    if (::write(m_fd, header, 2 * sizeof(size_t)) != 2 * sizeof(size_t)) {
        ::perror("ERROR: MatrixFile::resize: Failed to write header");
        throw FileOperationFailed(m_fileName, "write");
    }

    if (w > 0 && m_defaultCacheWidth > w) {
        m_defaultCacheWidth = w;
    }

//!!!    static size_t maxCacheMB = 16;
    static size_t maxCacheMB = 4;
    if (2 * m_defaultCacheWidth * h * m_cellSize > maxCacheMB * 1024 * 1024) { //!!!
        m_defaultCacheWidth = (maxCacheMB * 1024 * 1024) / (2 * h * m_cellSize);
        if (m_defaultCacheWidth < 16) m_defaultCacheWidth = 16;
    }

    if (m_columnBitset) {
        MutexLocker locker(&m_columnBitsetWriteMutex,
                           "MatrixFile::resize::m_columnBitsetWriteMutex");
        m_columnBitset->resize(w);
    }

    if (m_cache.data) {
        free(m_cache.data);
        m_cache.data = 0;
    }

    if (m_spareData) {
        free(m_spareData);
        m_spareData = 0;
    }
    
    m_width = w;
    m_height = h;

    totalStorage += (m_headerSize + (m_width * m_height * m_cellSize));
    totalMemory += (2 * m_defaultCacheWidth * m_height * m_cellSize);

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::resize(" << w << ", " << h << "): cache width "
              << m_defaultCacheWidth << ", storage "
              << (m_headerSize + w * h * m_cellSize) << ", mem "
              << (2 * h * m_defaultCacheWidth * m_cellSize) << std::endl;

    std::cerr << "Total storage " << totalStorage/1024 << "K, theoretical max memory "
              << totalMemory/1024 << "K in " << totalCount << " instances" << std::endl;
#endif

    seekTo(0, 0);
}

void
MatrixFile::reset()
{
    Profiler profiler("MatrixFile::reset", true);

    assert (m_mode == ReadWrite);
    
    if (m_eagerCache) {
        void *emptyCol = calloc(m_height, m_cellSize);
        for (size_t x = 0; x < m_width; ++x) setColumnAt(x, emptyCol);
        free(emptyCol);
    }
    
    if (m_columnBitset) {
        MutexLocker locker(&m_columnBitsetWriteMutex,
                           "MatrixFile::reset::m_columnBitsetWriteMutex");
        m_columnBitset->resize(m_width);
    }
}

void
MatrixFile::getColumnAt(size_t x, void *data)
{
    Profiler profiler("MatrixFile::getColumnAt");

//    assert(haveSetColumnAt(x));

    if (getFromCache(x, 0, m_height, data)) return;

    Profiler profiler2("MatrixFile::getColumnAt (uncached)");

    ssize_t r = 0;

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile::getColumnAt(" << x << ")"
              << ": reading the slow way";

    if (m_requestToken >= 0 &&
        x >= m_requestingX &&
        x <  m_requestingX + m_requestingWidth) {
        
        std::cerr << " (awaiting " << m_requestingX << ", " << m_requestingWidth << " from disk)";
    }

    std::cerr << std::endl;
#endif

    {
        MutexLocker locker(&m_fdMutex, "MatrixFile::getColumnAt::m_fdMutex");

        if (seekTo(x, 0)) {
            r = ::read(m_fd, data, m_height * m_cellSize);
        }
    }
    
    if (r < 0) {
        ::perror("MatrixFile::getColumnAt: read failed");
        std::cerr << "ERROR: MatrixFile::getColumnAt: "
                  << "Failed to read column " << x << " (height " << m_height << ", cell size " << m_cellSize << ", fd " << m_fd << ", file \""
                  << m_fileName.toStdString() << "\")" << std::endl;
        throw FileReadFailed(m_fileName);
    }

    return;
}

bool
MatrixFile::getFromCache(size_t x, size_t ystart, size_t ycount, void *data)
{
    bool fail = false;
    bool primeLeft = false;

    {
        MutexLocker locker(&m_cacheMutex,
                           "MatrixFile::getFromCache::m_cacheMutex");

        if (!m_cache.data || x < m_cache.x || x >= m_cache.x + m_cache.width) {
            fail = true;
            primeLeft = (m_cache.data && x < m_cache.x);
        } else {
            memcpy(data,
                   m_cache.data + m_cellSize * ((x - m_cache.x) * m_height + ystart),
                   ycount * m_cellSize);
        }            
    }

    if (fail) {
        primeCache(x, primeLeft); // this doesn't take effect until a later callback
        m_prevX = x;
        return false;
    }

    if (m_cache.x > 0 && x < m_prevX && x < m_cache.x + m_cache.width/4) {
        primeCache(x, true);
    }

    if (m_cache.x + m_cache.width < m_width &&
        x > m_prevX &&
        x > m_cache.x + (m_cache.width * 3) / 4) {
        primeCache(x, false);
    }

    m_prevX = x;
    return true;
}

void
MatrixFile::setColumnAt(size_t x, const void *data)
{
    assert(m_mode == ReadWrite);

#ifdef DEBUG_MATRIX_FILE_READ_SET
//    std::cerr << "MatrixFile::setColumnAt(" << x << ")" << std::endl;
    std::cerr << ".";
#endif

    ssize_t w = 0;
    bool seekFailed = false;
    
    {
        MutexLocker locker(&m_fdMutex, "MatrixFile::setColumnAt::m_fdMutex");

        if (seekTo(x, 0)) {
            w = ::write(m_fd, data, m_height * m_cellSize);
        } else {
            seekFailed = true;
        }
    }

    if (!seekFailed && w != ssize_t(m_height * m_cellSize)) {
        ::perror("WARNING: MatrixFile::setColumnAt: write failed");
        throw FileOperationFailed(m_fileName, "write");
    } else if (seekFailed) {
        throw FileOperationFailed(m_fileName, "seek");
    } else {
        MutexLocker locker
            (&m_columnBitsetWriteMutex,
             "MatrixFile::setColumnAt::m_columnBitsetWriteMutex");
        m_columnBitset->set(x);
    }
}

void
MatrixFile::suspend()
{
    MutexLocker locker(&m_fdMutex, "MatrixFile::suspend::m_fdMutex");
    MutexLocker locker2(&m_cacheMutex, "MatrixFile::suspend::m_cacheMutex");

    if (m_fd < 0) return; // already suspended

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::suspend(): fd was " << m_fd << std::endl;
#endif

    if (m_requestToken >= 0) {
        void *data = 0;
        FileReadThread::Request request;
        if (m_readThread->getRequest(m_requestToken, request)) {
            data = request.data;
        }
        m_readThread->cancel(m_requestToken);
        if (data) free(data);
        m_requestToken = -1;
    }

    if (m_cache.data) {
        free(m_cache.data);
        m_cache.data = 0;
    }

    if (m_spareData) {
        free(m_spareData);
        m_spareData = 0;
    }
    
    if (::close(m_fd) < 0) {
        ::perror("WARNING: MatrixFile::suspend: close failed");
        throw FileOperationFailed(m_fileName, "close");
    }

    m_fd = -1;
}

void
MatrixFile::resume()
{
    if (m_fd >= 0) return;

#ifdef DEBUG_MATRIX_FILE    
    std::cerr << "MatrixFile(" << this << ")::resume()" << std::endl;
#endif

    if ((m_fd = ::open(m_fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
        ::perror("Open failed");
        std::cerr << "ERROR: MatrixFile::resume: "
                  << "Failed to open cache file \""
                  << m_fileName.toStdString() << "\"";
        if (m_mode == ReadWrite) std::cerr << " for writing";
        std::cerr << std::endl;
        throw FailedToOpenFile(m_fileName);
    }

#ifdef DEBUG_MATRIX_FILE
    std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::resume(): fd is " << m_fd << std::endl;
#endif
}

void
MatrixFile::primeCache(size_t x, bool goingLeft)
{
//    Profiler profiler("MatrixFile::primeCache");

#ifdef DEBUG_MATRIX_FILE_READ_SET
    std::cerr << "MatrixFile::primeCache(" << x << ", " << goingLeft << ")" << std::endl;
#endif

    size_t rx = x;
    size_t rw = m_defaultCacheWidth;

    size_t left = rw / 3;
    if (goingLeft) left = (rw * 2) / 3;

    if (rx > left) rx -= left;
    else rx = 0;

    if (rx + rw > m_width) rw = m_width - rx;

    if (!m_eagerCache) {

        size_t ti = 0;

        for (ti = 0; ti < rw; ++ti) {
            if (!m_columnBitset->get(rx + ti)) break;
        }
        
#ifdef DEBUG_MATRIX_FILE
        if (ti < rw) {
            std::cerr << "eagerCache is false and there's a hole at "
                      << rx + ti << ", reducing rw from " << rw << " to "
                      << ti << std::endl;
        }
#endif

        rw = std::min(rw, ti);
        if (rw < 10 || rx + rw <= x) return;
    }

    MutexLocker locker(&m_cacheMutex, "MatrixFile::primeCache::m_cacheMutex");

    // Check for the existence of the request first; if it exists,
    // check whether it's ready.  Only when we know it's ready do we
    // retrieve the actual request, because the reason we need the
    // request is to check whether it was successful or not and
    // extract data from it, and none of that can be relied upon if we
    // retrieve the request before it's ready.  (There used to be a
    // race condition here, where we retrieved the request and only
    // afterwards checked the ready status, pulling data from the
    // request if it was found to be ready then.)

    if (m_requestToken >= 0 &&
        m_readThread->haveRequest(m_requestToken)) {

        if (x >= m_requestingX &&
            x <  m_requestingX + m_requestingWidth) {

            if (m_readThread->isReady(m_requestToken)) {

                FileReadThread::Request request;
                if (!m_readThread->getRequest(m_requestToken, request)) {
                    std::cerr << "ERROR: MatrixFile::primeCache: File read thread has lost our request!" << std::endl;
                    throw FileReadFailed(m_fileName);
                }

                if (!request.successful) {
                    std::cerr << "ERROR: MatrixFile::primeCache: Last request was unsuccessful" << std::endl;
                    throw FileReadFailed(m_fileName);
                }
                
#ifdef DEBUG_MATRIX_FILE_READ_SET
                std::cerr << "last request is ready! (" << m_requestingX << ", "<< m_requestingWidth << ")"  << std::endl;
#endif

                m_cache.x = (request.start - m_headerSize) / (m_height * m_cellSize);
                m_cache.width = request.size / (m_height * m_cellSize);
      
#ifdef DEBUG_MATRIX_FILE_READ_SET
                std::cerr << "received last request: actual size is: " << m_cache.x << ", " << m_cache.width << std::endl;
#endif

                if (m_cache.data) {
                    if (m_spareData) {
//                        std::cerr << this << ": Freeing spare data" << std::endl;
                        free(m_spareData);
                    }
//                    std::cerr << this << ": Moving old cache data to spare" << std::endl;
                    m_spareData = m_cache.data;
                }
//                std::cerr << this << ": Moving request data to cache" << std::endl;
                m_cache.data = request.data;

                m_readThread->done(m_requestToken);
                m_requestToken = -1;
            }

            // already requested something covering this area; wait for it
            return;
        }

        FileReadThread::Request dud;

        if (!m_readThread->getRequest(m_requestToken, dud)) {

            std::cerr << "ERROR: MatrixFile::primeCache: Inconsistent replies from FileReadThread" << std::endl;

        } else {

            // current request is for the wrong area, so no longer of any use
            m_readThread->cancel(m_requestToken);

            // crude way to avoid leaking the data
            while (!m_readThread->isCancelled(m_requestToken)) {
                usleep(10000);
            }

#ifdef DEBUG_MATRIX_FILE_READ_SET
            std::cerr << "cancelled " << m_requestToken << std::endl;
#endif

            if (m_spareData) {
                free(m_spareData);
            }
            m_spareData = dud.data;
            m_readThread->done(m_requestToken);
        }

        m_requestToken = -1;
    }

    if (m_fd < 0) {
        MutexLocker locker(&m_fdMutex, "MatrixFile::primeCache::m_fdMutex");
        if (m_fd < 0) resume();
    }

    FileReadThread::Request request;

    request.fd = m_fd;
    request.mutex = &m_fdMutex;
    request.start = m_headerSize + rx * m_height * m_cellSize;
    request.size = rw * m_height * m_cellSize;

//    std::cerr << this << ": Moving spare data to request, and resizing to " << rw * m_height * m_cellSize << std::endl;

    request.data = (char *)realloc(m_spareData, rw * m_height * m_cellSize);
    MUNLOCK(request.data, rw * m_height * m_cellSize);
    m_spareData = 0;

    m_requestingX = rx;
    m_requestingWidth = rw;

    int token = m_readThread->request(request);
#ifdef DEBUG_MATRIX_FILE_READ_SET
    std::cerr << "MatrixFile::primeCache: request token is "
              << token << " (x = [" << rx << "], w = [" << rw << "], left = [" << goingLeft << "])" << std::endl;
#endif
    m_requestToken = token;
}

bool
MatrixFile::seekTo(size_t x, size_t y)
{
    if (m_fd < 0) resume();

    off_t off = m_headerSize + (x * m_height + y) * m_cellSize;

    if (::lseek(m_fd, off, SEEK_SET) == (off_t)-1) {
        ::perror("Seek failed");
        std::cerr << "ERROR: MatrixFile::seekTo(" << x << ", " << y
                  << ") failed" << std::endl;
        return false;
    }

    return true;
}