Chris@148: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ Chris@148: Chris@148: /* Chris@148: Sonic Visualiser Chris@148: An audio file viewer and annotation editor. Chris@148: Centre for Digital Music, Queen Mary, University of London. Chris@148: This file copyright 2006 Chris Cannam. Chris@148: Chris@148: This program is free software; you can redistribute it and/or Chris@148: modify it under the terms of the GNU General Public License as Chris@148: published by the Free Software Foundation; either version 2 of the Chris@148: License, or (at your option) any later version. See the file Chris@148: COPYING included with this distribution for more information. Chris@148: */ Chris@148: Chris@148: #include "FileReadThread.h" Chris@148: Chris@148: #include "base/Profiler.h" Chris@408: #include "base/Thread.h" Chris@148: Chris@148: #include Chris@148: #include Chris@658: #include Chris@148: Chris@455: //#define DEBUG_FILE_READ_THREAD 1 Chris@148: Chris@148: FileReadThread::FileReadThread() : Chris@148: m_nextToken(0), Chris@148: m_exiting(false) Chris@148: { Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::run() Chris@148: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex"); Chris@148: Chris@148: while (!m_exiting) { Chris@148: if (m_queue.empty()) { Chris@148: m_condition.wait(&m_mutex, 1000); Chris@148: } else { Chris@148: process(); Chris@148: } Chris@148: notifyCancelled(); Chris@148: } Chris@148: Chris@148: notifyCancelled(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::run() exiting" << std::endl; Chris@148: #endif Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::finish() Chris@148: { Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::finish()" << std::endl; Chris@148: #endif Chris@148: Chris@408: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex"); Chris@408: Chris@408: while (!m_queue.empty()) { Chris@408: m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; Chris@408: m_newlyCancelled.insert(m_queue.begin()->first); Chris@408: m_queue.erase(m_queue.begin()); Chris@408: } Chris@408: Chris@408: m_exiting = true; Chris@148: } Chris@148: Chris@148: m_condition.wakeAll(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::finish() exiting" << std::endl; Chris@148: #endif Chris@148: } Chris@148: Chris@148: int Chris@148: FileReadThread::request(const Request &request) Chris@148: { Chris@408: int token; Chris@408: Chris@408: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex"); Chris@148: Chris@408: token = m_nextToken++; Chris@408: m_queue[token] = request; Chris@408: } Chris@148: Chris@148: m_condition.wakeAll(); Chris@148: Chris@148: return token; Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::cancel(int token) Chris@148: { Chris@408: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex"); Chris@148: Chris@408: if (m_queue.find(token) != m_queue.end()) { Chris@408: m_cancelledRequests[token] = m_queue[token]; Chris@408: m_queue.erase(token); Chris@408: m_newlyCancelled.insert(token); Chris@408: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@408: m_cancelledRequests[token] = m_readyRequests[token]; Chris@408: m_readyRequests.erase(token); Chris@408: } else { Chris@408: std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; Chris@408: } Chris@148: } Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; Chris@148: #endif Chris@148: Chris@148: m_condition.wakeAll(); Chris@148: } Chris@148: Chris@148: bool Chris@148: FileReadThread::isReady(int token) Chris@148: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex"); Chris@148: Chris@148: bool ready = m_readyRequests.find(token) != m_readyRequests.end(); Chris@148: Chris@148: return ready; Chris@148: } Chris@148: Chris@148: bool Chris@148: FileReadThread::isCancelled(int token) Chris@148: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex"); Chris@148: Chris@148: bool cancelled = Chris@148: m_cancelledRequests.find(token) != m_cancelledRequests.end() && Chris@148: m_newlyCancelled.find(token) == m_newlyCancelled.end(); Chris@148: Chris@148: return cancelled; Chris@148: } Chris@148: Chris@148: bool Chris@455: FileReadThread::haveRequest(int token) Chris@455: { Chris@455: MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex"); Chris@455: Chris@455: bool found = false; Chris@455: Chris@455: if (m_queue.find(token) != m_queue.end()) { Chris@455: found = true; Chris@455: } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { Chris@455: found = true; Chris@455: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@455: found = true; Chris@455: } Chris@455: Chris@455: return found; Chris@455: } Chris@455: Chris@455: bool Chris@148: FileReadThread::getRequest(int token, Request &request) Chris@148: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex"); Chris@148: Chris@148: bool found = false; Chris@148: Chris@148: if (m_queue.find(token) != m_queue.end()) { Chris@148: request = m_queue[token]; Chris@148: found = true; Chris@148: } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { Chris@148: request = m_cancelledRequests[token]; Chris@148: found = true; Chris@148: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@148: request = m_readyRequests[token]; Chris@148: found = true; Chris@148: } Chris@148: Chris@148: return found; Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::done(int token) Chris@148: { Chris@408: MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex"); Chris@148: Chris@148: bool found = false; Chris@148: Chris@148: if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { Chris@148: m_cancelledRequests.erase(token); Chris@148: m_newlyCancelled.erase(token); Chris@148: found = true; Chris@148: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@148: m_readyRequests.erase(token); Chris@148: found = true; Chris@148: } else if (m_queue.find(token) != m_queue.end()) { Chris@148: std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; Chris@148: } Chris@148: Chris@148: if (!found) { Chris@148: std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; Chris@148: } Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::process() Chris@148: { Chris@148: // entered with m_mutex locked and m_queue non-empty Chris@148: Chris@408: Profiler profiler("FileReadThread::process", true); Chris@148: Chris@148: int token = m_queue.begin()->first; Chris@148: Request request = m_queue.begin()->second; Chris@148: Chris@148: m_mutex.unlock(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl; Chris@148: #endif Chris@148: Chris@148: bool successful = false; Chris@148: bool seekFailed = false; Chris@148: ssize_t r = 0; Chris@148: Chris@408: { Chris@408: MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex"); Chris@148: Chris@408: if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { Chris@408: seekFailed = true; Chris@408: } else { Chris@148: Chris@408: // if request.size is large, we want to avoid making a single Chris@408: // system call to read it all as it may block too much Chris@408: Chris@408: static const size_t blockSize = 256 * 1024; Chris@408: Chris@408: size_t size = request.size; Chris@408: char *destination = request.data; Chris@408: Chris@408: while (size > 0) { Chris@408: size_t readSize = size; Chris@408: if (readSize > blockSize) readSize = blockSize; Chris@408: ssize_t br = ::read(request.fd, destination, readSize); Chris@408: if (br < 0) { Chris@408: r = br; Chris@408: break; Chris@408: } else { Chris@408: r += br; Chris@408: if (br < ssize_t(readSize)) break; Chris@408: } Chris@408: destination += readSize; Chris@408: size -= readSize; Chris@148: } Chris@148: } Chris@148: } Chris@148: Chris@148: if (seekFailed) { Chris@148: ::perror("Seek failed"); Chris@148: std::cerr << "ERROR: FileReadThread::process: seek to " Chris@148: << request.start << " failed" << std::endl; Chris@148: request.size = 0; Chris@148: } else { Chris@148: if (r < 0) { Chris@148: ::perror("ERROR: FileReadThread::process: Read failed"); Chris@455: std::cerr << "ERROR: FileReadThread::process: read of " Chris@455: << request.size << " at " Chris@455: << request.start << " failed" << std::endl; Chris@148: request.size = 0; Chris@148: } else if (r < ssize_t(request.size)) { Chris@148: std::cerr << "WARNING: FileReadThread::process: read " Chris@148: << request.size << " returned only " << r << " bytes" Chris@148: << std::endl; Chris@148: request.size = r; Chris@148: usleep(100000); Chris@148: } else { Chris@148: successful = true; Chris@148: } Chris@148: } Chris@148: Chris@148: // Check that the token hasn't been cancelled and the thread Chris@148: // hasn't been asked to finish Chris@148: Chris@148: m_mutex.lock(); Chris@148: Chris@148: request.successful = successful; Chris@148: Chris@148: if (m_queue.find(token) != m_queue.end() && !m_exiting) { Chris@148: m_queue.erase(token); Chris@148: m_readyRequests[token] = request; Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@455: std::cerr << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << std::endl; Chris@148: #endif Chris@148: } else { Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@455: if (m_exiting) { Chris@455: std::cerr << "FileReadThread::process: exiting" << std::endl; Chris@455: } else { Chris@455: std::cerr << "FileReadThread::process: request disappeared" << std::endl; Chris@455: } Chris@148: #endif Chris@148: } Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::notifyCancelled() Chris@148: { Chris@148: // entered with m_mutex locked Chris@148: Chris@148: while (!m_newlyCancelled.empty()) { Chris@148: Chris@148: int token = *m_newlyCancelled.begin(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl; Chris@148: #endif Chris@148: Chris@148: m_newlyCancelled.erase(token); Chris@148: } Chris@148: } Chris@148: Chris@148: