Chris@95: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ Chris@95: Chris@95: /* Chris@95: Sonic Visualiser Chris@95: An audio file viewer and annotation editor. Chris@95: Centre for Digital Music, Queen Mary, University of London. Chris@95: This file copyright 2006 Chris Cannam. Chris@95: Chris@95: This program is free software; you can redistribute it and/or Chris@95: modify it under the terms of the GNU General Public License as Chris@95: published by the Free Software Foundation; either version 2 of the Chris@95: License, or (at your option) any later version. See the file Chris@95: COPYING included with this distribution for more information. Chris@95: */ Chris@95: Chris@95: #include "FileReadThread.h" Chris@95: Chris@95: #include Chris@100: #include Chris@95: Chris@95: FileReadThread::FileReadThread() : Chris@95: m_nextToken(0), Chris@95: m_exiting(false) Chris@95: { Chris@95: } Chris@95: Chris@95: void Chris@95: FileReadThread::run() Chris@95: { Chris@95: m_mutex.lock(); Chris@95: Chris@95: while (!m_exiting) { Chris@95: if (m_queue.empty()) { Chris@95: m_condition.wait(&m_mutex, 1000); Chris@95: } else { Chris@95: process(); Chris@95: } Chris@96: notifyCancelled(); Chris@95: } Chris@95: Chris@95: notifyCancelled(); Chris@95: m_mutex.unlock(); Chris@95: Chris@95: std::cerr << "FileReadThread::run() exiting" << std::endl; Chris@95: } Chris@95: Chris@95: void Chris@95: FileReadThread::finish() Chris@95: { Chris@95: std::cerr << "FileReadThread::finish()" << std::endl; Chris@95: Chris@95: m_mutex.lock(); Chris@95: while (!m_queue.empty()) { Chris@95: m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; Chris@95: m_newlyCancelled.insert(m_queue.begin()->first); Chris@95: m_queue.erase(m_queue.begin()); Chris@95: } Chris@95: Chris@95: m_exiting = true; Chris@95: m_mutex.unlock(); Chris@95: Chris@95: m_condition.wakeAll(); Chris@95: Chris@95: std::cerr << "FileReadThread::finish() exiting" << std::endl; Chris@95: } Chris@95: Chris@95: int Chris@95: FileReadThread::request(const Request &request) Chris@95: { Chris@95: m_mutex.lock(); Chris@95: Chris@95: int token = m_nextToken++; Chris@95: m_queue[token] = request; Chris@95: Chris@95: m_mutex.unlock(); Chris@95: m_condition.wakeAll(); Chris@95: Chris@95: return token; Chris@95: } Chris@95: Chris@95: void Chris@95: FileReadThread::cancel(int token) Chris@95: { Chris@95: m_mutex.lock(); Chris@95: Chris@95: if (m_queue.find(token) != m_queue.end()) { Chris@95: m_cancelledRequests[token] = m_queue[token]; Chris@95: m_queue.erase(token); Chris@95: m_newlyCancelled.insert(token); Chris@96: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@96: m_cancelledRequests[token] = m_readyRequests[token]; Chris@96: m_readyRequests.erase(token); Chris@96: } else { Chris@96: std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; Chris@95: } Chris@95: Chris@95: m_mutex.unlock(); Chris@96: Chris@96: std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; Chris@96: Chris@96: m_condition.wakeAll(); Chris@95: } Chris@95: Chris@95: bool Chris@95: FileReadThread::isReady(int token) Chris@95: { Chris@95: m_mutex.lock(); Chris@95: Chris@95: bool ready = m_readyRequests.find(token) != m_readyRequests.end(); Chris@95: Chris@95: m_mutex.unlock(); Chris@95: return ready; Chris@95: } Chris@95: Chris@95: bool Chris@95: FileReadThread::isCancelled(int token) Chris@95: { Chris@95: m_mutex.lock(); Chris@95: Chris@95: bool cancelled = Chris@95: m_cancelledRequests.find(token) != m_cancelledRequests.end() && Chris@95: m_newlyCancelled.find(token) == m_newlyCancelled.end(); Chris@95: Chris@95: m_mutex.unlock(); Chris@95: return cancelled; Chris@95: } Chris@95: Chris@95: bool Chris@95: FileReadThread::getRequest(int token, Request &request) Chris@95: { Chris@95: m_mutex.lock(); Chris@95: Chris@95: bool found = false; Chris@95: Chris@95: if (m_queue.find(token) != m_queue.end()) { Chris@95: request = m_queue[token]; Chris@95: found = true; Chris@95: } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { Chris@95: request = m_cancelledRequests[token]; Chris@95: found = true; Chris@95: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@95: request = m_readyRequests[token]; Chris@95: found = true; Chris@95: } Chris@95: Chris@95: m_mutex.unlock(); Chris@95: Chris@95: return found; Chris@95: } Chris@95: Chris@95: void Chris@95: FileReadThread::done(int token) Chris@95: { Chris@95: m_mutex.lock(); Chris@95: Chris@95: bool found = false; Chris@95: Chris@95: if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { Chris@95: m_cancelledRequests.erase(token); Chris@95: m_newlyCancelled.erase(token); Chris@95: found = true; Chris@95: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { Chris@95: m_readyRequests.erase(token); Chris@95: found = true; Chris@95: } else if (m_queue.find(token) != m_queue.end()) { Chris@95: std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; Chris@95: } Chris@95: Chris@95: m_mutex.unlock(); Chris@95: Chris@95: if (!found) { Chris@95: std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; Chris@95: } Chris@95: } Chris@95: Chris@95: void Chris@95: FileReadThread::process() Chris@95: { Chris@95: // entered with m_mutex locked and m_queue non-empty Chris@95: Chris@95: int token = m_queue.begin()->first; Chris@95: Request request = m_queue.begin()->second; Chris@95: Chris@95: m_mutex.unlock(); Chris@95: Chris@95: std::cerr << "FileReadThread::process: got something to do" << std::endl; Chris@95: Chris@95: bool successful = false; Chris@95: bool seekFailed = false; Chris@95: ssize_t r = 0; Chris@95: Chris@95: if (request.mutex) request.mutex->lock(); Chris@95: Chris@95: if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { Chris@95: seekFailed = true; Chris@95: } else { Chris@96: Chris@96: // if request.size is large, we want to avoid making a single Chris@96: // system call to read it all as it may block too much Chris@96: Chris@96: static const size_t blockSize = 16384; Chris@96: Chris@96: size_t size = request.size; Chris@96: char *destination = request.data; Chris@96: Chris@96: while (size > 0) { Chris@96: size_t readSize = size; Chris@96: if (readSize > blockSize) readSize = blockSize; Chris@96: ssize_t br = ::read(request.fd, destination, readSize); Chris@96: if (br < 0) { Chris@96: r = br; Chris@96: break; Chris@96: } else { Chris@96: r += br; Chris@96: if (br < ssize_t(readSize)) break; Chris@96: } Chris@96: destination += readSize; Chris@96: size -= readSize; Chris@96: } Chris@95: } Chris@95: Chris@95: if (request.mutex) request.mutex->unlock(); Chris@95: Chris@95: if (seekFailed) { Chris@95: ::perror("Seek failed"); Chris@95: std::cerr << "ERROR: FileReadThread::process: seek to " Chris@95: << request.start << " failed" << std::endl; Chris@95: request.size = 0; Chris@95: } else { Chris@95: if (r < 0) { Chris@95: ::perror("ERROR: FileReadThread::process: Read failed"); Chris@95: request.size = 0; Chris@95: } else if (r < ssize_t(request.size)) { Chris@95: std::cerr << "WARNING: FileReadThread::process: read " Chris@95: << request.size << " returned only " << r << " bytes" Chris@95: << std::endl; Chris@95: request.size = r; Chris@95: } else { Chris@95: successful = true; Chris@95: } Chris@95: } Chris@95: Chris@95: // Check that the token hasn't been cancelled and the thread Chris@95: // hasn't been asked to finish Chris@95: Chris@95: m_mutex.lock(); Chris@95: Chris@95: if (m_queue.find(token) != m_queue.end() && !m_exiting) { Chris@95: m_queue.erase(token); Chris@95: m_readyRequests[token] = request; Chris@95: m_mutex.unlock(); Chris@95: std::cerr << "emitting" << std::endl; Chris@95: emit ready(token, successful); Chris@95: m_mutex.lock(); Chris@95: } Chris@95: } Chris@95: Chris@95: void Chris@95: FileReadThread::notifyCancelled() Chris@95: { Chris@95: // entered with m_mutex locked Chris@95: Chris@95: while (!m_newlyCancelled.empty()) { Chris@96: Chris@95: int token = *m_newlyCancelled.begin(); Chris@96: Chris@96: std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl; Chris@96: Chris@95: m_newlyCancelled.erase(token); Chris@95: emit cancelled(token); Chris@95: } Chris@95: } Chris@95: Chris@95: