Chris@148: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*-  vi:set ts=8 sts=4 sw=4: */
Chris@148: 
Chris@148: /*
Chris@148:     Sonic Visualiser
Chris@148:     An audio file viewer and annotation editor.
Chris@148:     Centre for Digital Music, Queen Mary, University of London.
Chris@148:     This file copyright 2006 Chris Cannam.
Chris@148:     
Chris@148:     This program is free software; you can redistribute it and/or
Chris@148:     modify it under the terms of the GNU General Public License as
Chris@148:     published by the Free Software Foundation; either version 2 of the
Chris@148:     License, or (at your option) any later version.  See the file
Chris@148:     COPYING included with this distribution for more information.
Chris@148: */
Chris@148: 
Chris@148: #include "FileReadThread.h"
Chris@148: 
Chris@148: #include "base/Profiler.h"
Chris@408: #include "base/Thread.h"
Chris@148: 
Chris@148: #include <iostream>
Chris@148: #include <unistd.h>
Chris@658: #include <cstdio>
Chris@148: 
Chris@455: //#define DEBUG_FILE_READ_THREAD 1
Chris@148: 
Chris@148: FileReadThread::FileReadThread() :
Chris@148:     m_nextToken(0),
Chris@148:     m_exiting(false)
Chris@148: {
Chris@148: }
Chris@148: 
Chris@148: void
Chris@148: FileReadThread::run()
Chris@148: {
Chris@408:     MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148: 
Chris@148:     while (!m_exiting) {
Chris@148:         if (m_queue.empty()) {
Chris@148:             m_condition.wait(&m_mutex, 1000);
Chris@148:         } else {
Chris@148:             process();
Chris@148:         }
Chris@148:         notifyCancelled();
Chris@148:     }
Chris@148: 
Chris@148:     notifyCancelled();
Chris@148: 
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@690:     SVDEBUG << "FileReadThread::run() exiting" << endl;
Chris@148: #endif
Chris@148: }
Chris@148: 
Chris@148: void
Chris@148: FileReadThread::finish()
Chris@148: {
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@690:     SVDEBUG << "FileReadThread::finish()" << endl;
Chris@148: #endif
Chris@148: 
Chris@408:     {
Chris@408:         MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408: 
Chris@408:         while (!m_queue.empty()) {
Chris@408:             m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408:             m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408:             m_queue.erase(m_queue.begin());
Chris@408:         }
Chris@408:         
Chris@408:         m_exiting = true;
Chris@148:     }
Chris@148: 
Chris@148:     m_condition.wakeAll();
Chris@148: 
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@690:     SVDEBUG << "FileReadThread::finish() exiting" << endl;
Chris@148: #endif
Chris@148: }
Chris@148: 
Chris@148: int
Chris@148: FileReadThread::request(const Request &request)
Chris@148: {
Chris@408:     int token;
Chris@408: 
Chris@408:     {
Chris@408:         MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148:     
Chris@408:         token = m_nextToken++;
Chris@408:         m_queue[token] = request;
Chris@408:     }
Chris@148: 
Chris@148:     m_condition.wakeAll();
Chris@148: 
Chris@148:     return token;
Chris@148: }
Chris@148: 
Chris@148: void
Chris@148: FileReadThread::cancel(int token)
Chris@148: {
Chris@408:     {
Chris@408:         MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148: 
Chris@408:         if (m_queue.find(token) != m_queue.end()) {
Chris@408:             m_cancelledRequests[token] = m_queue[token];
Chris@408:             m_queue.erase(token);
Chris@408:             m_newlyCancelled.insert(token);
Chris@408:         } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408:             m_cancelledRequests[token] = m_readyRequests[token];
Chris@408:             m_readyRequests.erase(token);
Chris@408:         } else {
Chris@843:             cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
Chris@408:         }
Chris@148:     }
Chris@148: 
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@690:     SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
Chris@148: #endif
Chris@148: 
Chris@148:     m_condition.wakeAll();
Chris@148: }
Chris@148: 
Chris@148: bool
Chris@148: FileReadThread::isReady(int token)
Chris@148: {
Chris@408:     MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148: 
Chris@148:     bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148: 
Chris@148:     return ready;
Chris@148: }
Chris@148: 
Chris@148: bool
Chris@148: FileReadThread::isCancelled(int token)
Chris@148: {
Chris@408:     MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148: 
Chris@148:     bool cancelled = 
Chris@148:         m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148:         m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148: 
Chris@148:     return cancelled;
Chris@148: }
Chris@148: 
Chris@148: bool
Chris@455: FileReadThread::haveRequest(int token)
Chris@455: {
Chris@455:     MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
Chris@455: 
Chris@455:     bool found = false;
Chris@455: 
Chris@455:     if (m_queue.find(token) != m_queue.end()) {
Chris@455:         found = true;
Chris@455:     } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@455:         found = true;
Chris@455:     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@455:         found = true;
Chris@455:     }
Chris@455: 
Chris@455:     return found;
Chris@455: }
Chris@455: 
Chris@455: bool
Chris@148: FileReadThread::getRequest(int token, Request &request)
Chris@148: {
Chris@408:     MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148: 
Chris@148:     bool found = false;
Chris@148: 
Chris@148:     if (m_queue.find(token) != m_queue.end()) {
Chris@148:         request = m_queue[token];
Chris@148:         found = true;
Chris@148:     } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148:         request = m_cancelledRequests[token];
Chris@148:         found = true;
Chris@148:     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148:         request = m_readyRequests[token];
Chris@148:         found = true;
Chris@148:     }
Chris@148: 
Chris@148:     return found;
Chris@148: }
Chris@148: 
Chris@148: void
Chris@148: FileReadThread::done(int token)
Chris@148: {
Chris@408:     MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148: 
Chris@148:     bool found = false;
Chris@148: 
Chris@148:     if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148:         m_cancelledRequests.erase(token);
Chris@148:         m_newlyCancelled.erase(token);
Chris@148:         found = true;
Chris@148:     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148:         m_readyRequests.erase(token);
Chris@148:         found = true;
Chris@148:     } else if (m_queue.find(token) != m_queue.end()) {
Chris@843:         cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
Chris@148:     }
Chris@148: 
Chris@148:     if (!found) {
Chris@843:         cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
Chris@148:     }
Chris@148: }
Chris@148: 
Chris@148: void
Chris@148: FileReadThread::process()
Chris@148: {
Chris@148:     // entered with m_mutex locked and m_queue non-empty
Chris@148: 
Chris@408:     Profiler profiler("FileReadThread::process", true);
Chris@148: 
Chris@148:     int token = m_queue.begin()->first;
Chris@148:     Request request = m_queue.begin()->second;
Chris@148: 
Chris@148:     m_mutex.unlock();
Chris@148: 
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@690:     SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
Chris@148: #endif
Chris@148: 
Chris@148:     bool successful = false;
Chris@148:     bool seekFailed = false;
Chris@148:     ssize_t r = 0;
Chris@148: 
Chris@408:     { 
Chris@408:         MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148: 
Chris@408:         if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408:             seekFailed = true;
Chris@408:         } else {
Chris@148:         
Chris@408:             // if request.size is large, we want to avoid making a single
Chris@408:             // system call to read it all as it may block too much
Chris@408:             
Chris@408:             static const size_t blockSize = 256 * 1024;
Chris@408:             
Chris@408:             size_t size = request.size;
Chris@408:             char *destination = request.data;
Chris@408:             
Chris@408:             while (size > 0) {
Chris@408:                 size_t readSize = size;
Chris@408:                 if (readSize > blockSize) readSize = blockSize;
Chris@408:                 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408:                 if (br < 0) { 
Chris@408:                     r = br;
Chris@408:                     break;
Chris@408:                 } else {
Chris@408:                     r += br;
Chris@408:                     if (br < ssize_t(readSize)) break;
Chris@408:                 }
Chris@408:                 destination += readSize;
Chris@408:                 size -= readSize;
Chris@148:             }
Chris@148:         }
Chris@148:     }
Chris@148: 
Chris@148:     if (seekFailed) {
Chris@148:         ::perror("Seek failed");
Chris@843:         cerr << "ERROR: FileReadThread::process: seek to "
Chris@843:                   << request.start << " failed" << endl;
Chris@148:         request.size = 0;
Chris@148:     } else {
Chris@148:         if (r < 0) {
Chris@148:             ::perror("ERROR: FileReadThread::process: Read failed");
Chris@843:             cerr << "ERROR: FileReadThread::process: read of "
Chris@455:                       << request.size << " at "
Chris@843:                       << request.start << " failed" << endl;
Chris@148:             request.size = 0;
Chris@148:         } else if (r < ssize_t(request.size)) {
Chris@843:             cerr << "WARNING: FileReadThread::process: read "
Chris@148:                       << request.size << " returned only " << r << " bytes"
Chris@843:                       << endl;
Chris@148:             request.size = r;
Chris@148:             usleep(100000);
Chris@148:         } else {
Chris@148:             successful = true;
Chris@148:         }
Chris@148:     }
Chris@148:         
Chris@148:     // Check that the token hasn't been cancelled and the thread
Chris@148:     // hasn't been asked to finish
Chris@148:     
Chris@148:     m_mutex.lock();
Chris@148: 
Chris@148:     request.successful = successful;
Chris@148:         
Chris@148:     if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148:         m_queue.erase(token);
Chris@148:         m_readyRequests[token] = request;
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@690:         SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
Chris@148: #endif
Chris@148:     } else {
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@455:         if (m_exiting) {
Chris@690:             SVDEBUG << "FileReadThread::process: exiting" << endl;
Chris@455:         } else {
Chris@690:             SVDEBUG << "FileReadThread::process: request disappeared" << endl;
Chris@455:         }
Chris@148: #endif
Chris@148:     }
Chris@148: }
Chris@148: 
Chris@148: void
Chris@148: FileReadThread::notifyCancelled()
Chris@148: {
Chris@148:     // entered with m_mutex locked
Chris@148: 
Chris@148:     while (!m_newlyCancelled.empty()) {
Chris@148: 
Chris@148:         int token = *m_newlyCancelled.begin();
Chris@148: 
Chris@148: #ifdef DEBUG_FILE_READ_THREAD
Chris@690:         SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
Chris@148: #endif
Chris@148: 
Chris@148:         m_newlyCancelled.erase(token);
Chris@148:     }
Chris@148: }
Chris@148:         
Chris@148: