view data/fileio/FileReadThread.cpp @ 316:3a6725f285d6

* Make RemoteFile far more pervasive, and use it for local files as well so that we can handle both transparently. Make it shallow copy with reference counting, so it can be used by value without having to worry about the cache file lifetime. Use RemoteFile for MainWindow file-open functions, etc
author Chris Cannam
date Thu, 18 Oct 2007 15:31:20 +0000
parents 1a42221a1522
children 115f60df1e4d
line wrap: on
line source
/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*-  vi:set ts=8 sts=4 sw=4: */

/*
    Sonic Visualiser
    An audio file viewer and annotation editor.
    Centre for Digital Music, Queen Mary, University of London.
    This file copyright 2006 Chris Cannam.
    
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
    published by the Free Software Foundation; either version 2 of the
    License, or (at your option) any later version.  See the file
    COPYING included with this distribution for more information.
*/

#include "FileReadThread.h"

#include "base/Profiler.h"

#include <iostream>
#include <unistd.h>

//#define DEBUG_FILE_READ_THREAD 1

FileReadThread::FileReadThread() :
    m_nextToken(0),
    m_exiting(false)
{
}

void
FileReadThread::run()
{
    m_mutex.lock();

    while (!m_exiting) {
        if (m_queue.empty()) {
            m_condition.wait(&m_mutex, 1000);
        } else {
            process();
        }
        notifyCancelled();
    }

    notifyCancelled();
    m_mutex.unlock();

#ifdef DEBUG_FILE_READ_THREAD
    std::cerr << "FileReadThread::run() exiting" << std::endl;
#endif
}

void
FileReadThread::finish()
{
#ifdef DEBUG_FILE_READ_THREAD
    std::cerr << "FileReadThread::finish()" << std::endl;
#endif

    m_mutex.lock();
    while (!m_queue.empty()) {
        m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
        m_newlyCancelled.insert(m_queue.begin()->first);
        m_queue.erase(m_queue.begin());
    }

    m_exiting = true;
    m_mutex.unlock();

    m_condition.wakeAll();

#ifdef DEBUG_FILE_READ_THREAD
    std::cerr << "FileReadThread::finish() exiting" << std::endl;
#endif
}

int
FileReadThread::request(const Request &request)
{
    m_mutex.lock();
    
    int token = m_nextToken++;
    m_queue[token] = request;

    m_mutex.unlock();
    m_condition.wakeAll();

    return token;
}

void
FileReadThread::cancel(int token)
{
    m_mutex.lock();

    if (m_queue.find(token) != m_queue.end()) {
        m_cancelledRequests[token] = m_queue[token];
        m_queue.erase(token);
        m_newlyCancelled.insert(token);
    } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
        m_cancelledRequests[token] = m_readyRequests[token];
        m_readyRequests.erase(token);
    } else {
        std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
    }

    m_mutex.unlock();

#ifdef DEBUG_FILE_READ_THREAD
    std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
#endif

    m_condition.wakeAll();
}

bool
FileReadThread::isReady(int token)
{
    m_mutex.lock();

    bool ready = m_readyRequests.find(token) != m_readyRequests.end();

    m_mutex.unlock();
    return ready;
}

bool
FileReadThread::isCancelled(int token)
{
    m_mutex.lock();

    bool cancelled = 
        m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
        m_newlyCancelled.find(token) == m_newlyCancelled.end();

    m_mutex.unlock();
    return cancelled;
}

bool
FileReadThread::getRequest(int token, Request &request)
{
    m_mutex.lock();

    bool found = false;

    if (m_queue.find(token) != m_queue.end()) {
        request = m_queue[token];
        found = true;
    } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
        request = m_cancelledRequests[token];
        found = true;
    } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
        request = m_readyRequests[token];
        found = true;
    }

    m_mutex.unlock();
    
    return found;
}

void
FileReadThread::done(int token)
{
    m_mutex.lock();

    bool found = false;

    if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
        m_cancelledRequests.erase(token);
        m_newlyCancelled.erase(token);
        found = true;
    } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
        m_readyRequests.erase(token);
        found = true;
    } else if (m_queue.find(token) != m_queue.end()) {
        std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
    }

    m_mutex.unlock();

    if (!found) {
        std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
    }
}

void
FileReadThread::process()
{
    // entered with m_mutex locked and m_queue non-empty

#ifdef DEBUG_FILE_READ_THREAD
    Profiler profiler("FileReadThread::process()", true);
#endif

    int token = m_queue.begin()->first;
    Request request = m_queue.begin()->second;

    m_mutex.unlock();

#ifdef DEBUG_FILE_READ_THREAD
    std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
#endif

    bool successful = false;
    bool seekFailed = false;
    ssize_t r = 0;

    if (request.mutex) request.mutex->lock();

    if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
        seekFailed = true;
    } else {
        
        // if request.size is large, we want to avoid making a single
        // system call to read it all as it may block too much

        static const size_t blockSize = 256 * 1024;
        
        size_t size = request.size;
        char *destination = request.data;

        while (size > 0) {
            size_t readSize = size;
            if (readSize > blockSize) readSize = blockSize;
            ssize_t br = ::read(request.fd, destination, readSize);
            if (br < 0) { 
                r = br;
                break;
            } else {
                r += br;
                if (br < ssize_t(readSize)) break;
            }
            destination += readSize;
            size -= readSize;
        }
    }

    if (request.mutex) request.mutex->unlock();

    if (seekFailed) {
        ::perror("Seek failed");
        std::cerr << "ERROR: FileReadThread::process: seek to "
                  << request.start << " failed" << std::endl;
        request.size = 0;
    } else {
        if (r < 0) {
            ::perror("ERROR: FileReadThread::process: Read failed");
            request.size = 0;
        } else if (r < ssize_t(request.size)) {
            std::cerr << "WARNING: FileReadThread::process: read "
                      << request.size << " returned only " << r << " bytes"
                      << std::endl;
            request.size = r;
            usleep(100000);
        } else {
            successful = true;
        }
    }
        
    // Check that the token hasn't been cancelled and the thread
    // hasn't been asked to finish
    
    m_mutex.lock();

    request.successful = successful;
        
    if (m_queue.find(token) != m_queue.end() && !m_exiting) {
        m_queue.erase(token);
        m_readyRequests[token] = request;
#ifdef DEBUG_FILE_READ_THREAD
        std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
#endif
    } else {
#ifdef DEBUG_FILE_READ_THREAD
        std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
#endif
    }
}

void
FileReadThread::notifyCancelled()
{
    // entered with m_mutex locked

    while (!m_newlyCancelled.empty()) {

        int token = *m_newlyCancelled.begin();

#ifdef DEBUG_FILE_READ_THREAD
        std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
#endif

        m_newlyCancelled.erase(token);
    }
}