Chris@148: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ Chris@148: Chris@148: /* Chris@148: Sonic Visualiser Chris@148: An audio file viewer and annotation editor. Chris@148: Centre for Digital Music, Queen Mary, University of London. Chris@148: This file copyright 2006 Chris Cannam. Chris@148: Chris@148: This program is free software; you can redistribute it and/or Chris@148: modify it under the terms of the GNU General Public License as Chris@148: published by the Free Software Foundation; either version 2 of the Chris@148: License, or (at your option) any later version. See the file Chris@148: COPYING included with this distribution for more information. Chris@148: */ Chris@148: Chris@148: #include "FileReadThread.h" Chris@148: Chris@148: #include "base/Profiler.h" Chris@148: Chris@148: #include Chris@148: #include Chris@148: Chris@148: //#define DEBUG_FILE_READ_THREAD 1 Chris@148: Chris@148: FileReadThread::FileReadThread() : Chris@148: m_nextToken(0), Chris@148: m_exiting(false) Chris@148: { Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::run() Chris@148: { Chris@148: m_mutex.lock(); Chris@148: Chris@148: while (!m_exiting) { Chris@148: if (m_queue.empty()) { Chris@148: m_condition.wait(&m_mutex, 1000); Chris@148: } else { Chris@148: process(); Chris@148: } Chris@148: notifyCancelled(); Chris@148: } Chris@148: Chris@148: notifyCancelled(); Chris@148: m_mutex.unlock(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::run() exiting" << std::endl; Chris@148: #endif Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::finish() Chris@148: { Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::finish()" << std::endl; Chris@148: #endif Chris@148: Chris@148: m_mutex.lock(); Chris@148: while (!m_queue.empty()) { Chris@148: m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; Chris@148: m_newlyCancelled.insert(m_queue.begin()->first); Chris@148: m_queue.erase(m_queue.begin()); Chris@148: } Chris@148: Chris@148: m_exiting = true; Chris@148: m_mutex.unlock(); Chris@148: Chris@148: m_condition.wakeAll(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::finish() exiting" << std::endl; Chris@148: #endif Chris@148: } Chris@148: Chris@148: int Chris@148: FileReadThread::request(const Request &request) Chris@148: { Chris@148: m_mutex.lock(); Chris@148: Chris@148: int token = m_nextToken++; Chris@148: m_queue[token] = request; Chris@148: Chris@148: m_mutex.unlock(); Chris@148: m_condition.wakeAll(); Chris@148: Chris@148: return token; Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::cancel(int token) Chris@148: { Chris@148: m_mutex.lock(); Chris@148: Chris@148: if (m_queue.find(token) != m_queue.end()) { Chris@148: m_cancelledRequests[token] = m_queue[token]; Chris@148: m_queue.erase(token); Chris@148: m_newlyCancelled.insert(token); Chris@148: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@148: m_cancelledRequests[token] = m_readyRequests[token]; Chris@148: m_readyRequests.erase(token); Chris@148: } else { Chris@148: std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; Chris@148: } Chris@148: Chris@148: m_mutex.unlock(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; Chris@148: #endif Chris@148: Chris@148: m_condition.wakeAll(); Chris@148: } Chris@148: Chris@148: bool Chris@148: FileReadThread::isReady(int token) Chris@148: { Chris@148: m_mutex.lock(); Chris@148: Chris@148: bool ready = m_readyRequests.find(token) != m_readyRequests.end(); Chris@148: Chris@148: m_mutex.unlock(); Chris@148: return ready; Chris@148: } Chris@148: Chris@148: bool Chris@148: FileReadThread::isCancelled(int token) Chris@148: { Chris@148: m_mutex.lock(); Chris@148: Chris@148: bool cancelled = Chris@148: m_cancelledRequests.find(token) != m_cancelledRequests.end() && Chris@148: m_newlyCancelled.find(token) == m_newlyCancelled.end(); Chris@148: Chris@148: m_mutex.unlock(); Chris@148: return cancelled; Chris@148: } Chris@148: Chris@148: bool Chris@148: FileReadThread::getRequest(int token, Request &request) Chris@148: { Chris@148: m_mutex.lock(); Chris@148: Chris@148: bool found = false; Chris@148: Chris@148: if (m_queue.find(token) != m_queue.end()) { Chris@148: request = m_queue[token]; Chris@148: found = true; Chris@148: } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { Chris@148: request = m_cancelledRequests[token]; Chris@148: found = true; Chris@148: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@148: request = m_readyRequests[token]; Chris@148: found = true; Chris@148: } Chris@148: Chris@148: m_mutex.unlock(); Chris@148: Chris@148: return found; Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::done(int token) Chris@148: { Chris@148: m_mutex.lock(); Chris@148: Chris@148: bool found = false; Chris@148: Chris@148: if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { Chris@148: m_cancelledRequests.erase(token); Chris@148: m_newlyCancelled.erase(token); Chris@148: found = true; Chris@148: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@148: m_readyRequests.erase(token); Chris@148: found = true; Chris@148: } else if (m_queue.find(token) != m_queue.end()) { Chris@148: std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; Chris@148: } Chris@148: Chris@148: m_mutex.unlock(); Chris@148: Chris@148: if (!found) { Chris@148: std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; Chris@148: } Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::process() Chris@148: { Chris@148: // entered with m_mutex locked and m_queue non-empty Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: Profiler profiler("FileReadThread::process()", true); Chris@148: #endif Chris@148: Chris@148: int token = m_queue.begin()->first; Chris@148: Request request = m_queue.begin()->second; Chris@148: Chris@148: m_mutex.unlock(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl; Chris@148: #endif Chris@148: Chris@148: bool successful = false; Chris@148: bool seekFailed = false; Chris@148: ssize_t r = 0; Chris@148: Chris@148: if (request.mutex) request.mutex->lock(); Chris@148: Chris@148: if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { Chris@148: seekFailed = true; Chris@148: } else { Chris@148: Chris@148: // if request.size is large, we want to avoid making a single Chris@148: // system call to read it all as it may block too much Chris@148: Chris@148: static const size_t blockSize = 256 * 1024; Chris@148: Chris@148: size_t size = request.size; Chris@148: char *destination = request.data; Chris@148: Chris@148: while (size > 0) { Chris@148: size_t readSize = size; Chris@148: if (readSize > blockSize) readSize = blockSize; Chris@148: ssize_t br = ::read(request.fd, destination, readSize); Chris@148: if (br < 0) { Chris@148: r = br; Chris@148: break; Chris@148: } else { Chris@148: r += br; Chris@148: if (br < ssize_t(readSize)) break; Chris@148: } Chris@148: destination += readSize; Chris@148: size -= readSize; Chris@148: } Chris@148: } Chris@148: Chris@148: if (request.mutex) request.mutex->unlock(); Chris@148: Chris@148: if (seekFailed) { Chris@148: ::perror("Seek failed"); Chris@148: std::cerr << "ERROR: FileReadThread::process: seek to " Chris@148: << request.start << " failed" << std::endl; Chris@148: request.size = 0; Chris@148: } else { Chris@148: if (r < 0) { Chris@148: ::perror("ERROR: FileReadThread::process: Read failed"); Chris@148: request.size = 0; Chris@148: } else if (r < ssize_t(request.size)) { Chris@148: std::cerr << "WARNING: FileReadThread::process: read " Chris@148: << request.size << " returned only " << r << " bytes" Chris@148: << std::endl; Chris@148: request.size = r; Chris@148: usleep(100000); Chris@148: } else { Chris@148: successful = true; Chris@148: } Chris@148: } Chris@148: Chris@148: // Check that the token hasn't been cancelled and the thread Chris@148: // hasn't been asked to finish Chris@148: Chris@148: m_mutex.lock(); Chris@148: Chris@148: request.successful = successful; Chris@148: Chris@148: if (m_queue.find(token) != m_queue.end() && !m_exiting) { Chris@148: m_queue.erase(token); Chris@148: m_readyRequests[token] = request; Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::process: done, marking as ready" << std::endl; Chris@148: #endif Chris@148: } else { Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl; Chris@148: #endif Chris@148: } Chris@148: } Chris@148: Chris@148: void Chris@148: FileReadThread::notifyCancelled() Chris@148: { Chris@148: // entered with m_mutex locked Chris@148: Chris@148: while (!m_newlyCancelled.empty()) { Chris@148: Chris@148: int token = *m_newlyCancelled.begin(); Chris@148: Chris@148: #ifdef DEBUG_FILE_READ_THREAD Chris@148: std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl; Chris@148: #endif Chris@148: Chris@148: m_newlyCancelled.erase(token); Chris@148: } Chris@148: } Chris@148: Chris@148: