lbajardsilogic@0: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ lbajardsilogic@0: lbajardsilogic@0: /* lbajardsilogic@0: Sonic Visualiser lbajardsilogic@0: An audio file viewer and annotation editor. lbajardsilogic@0: Centre for Digital Music, Queen Mary, University of London. lbajardsilogic@0: This file copyright 2006 Chris Cannam. lbajardsilogic@0: lbajardsilogic@0: This program is free software; you can redistribute it and/or lbajardsilogic@0: modify it under the terms of the GNU General Public License as lbajardsilogic@0: published by the Free Software Foundation; either version 2 of the lbajardsilogic@0: License, or (at your option) any later version. See the file lbajardsilogic@0: COPYING included with this distribution for more information. lbajardsilogic@0: */ lbajardsilogic@0: lbajardsilogic@0: #include "FileReadThread.h" lbajardsilogic@0: lbajardsilogic@0: #include "base/Profiler.h" lbajardsilogic@0: lbajardsilogic@0: #include lbajardsilogic@0: //#include lbajardsilogic@0: lbajardsilogic@0: //#define DEBUG_FILE_READ_THREAD 1 lbajardsilogic@0: lbajardsilogic@0: FileReadThread::FileReadThread() : lbajardsilogic@0: m_nextToken(0), lbajardsilogic@0: m_exiting(false) lbajardsilogic@0: { lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: void lbajardsilogic@0: FileReadThread::run() lbajardsilogic@0: { lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: while (!m_exiting) { lbajardsilogic@0: if (m_queue.empty()) { lbajardsilogic@0: m_condition.wait(&m_mutex, 1000); lbajardsilogic@0: } else { lbajardsilogic@0: process(); lbajardsilogic@0: } lbajardsilogic@0: notifyCancelled(); lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: notifyCancelled(); lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::run() exiting" << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: void lbajardsilogic@0: FileReadThread::finish() lbajardsilogic@0: { lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::finish()" << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: while (!m_queue.empty()) { lbajardsilogic@0: m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; lbajardsilogic@0: m_newlyCancelled.insert(m_queue.begin()->first); lbajardsilogic@0: m_queue.erase(m_queue.begin()); lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: m_exiting = true; lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: lbajardsilogic@0: m_condition.wakeAll(); lbajardsilogic@0: lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::finish() exiting" << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: int lbajardsilogic@0: FileReadThread::request(const Request &request) lbajardsilogic@0: { lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: int token = m_nextToken++; lbajardsilogic@0: m_queue[token] = request; lbajardsilogic@0: lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: m_condition.wakeAll(); lbajardsilogic@0: lbajardsilogic@0: return token; lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: void lbajardsilogic@0: FileReadThread::cancel(int token) lbajardsilogic@0: { lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: if (m_queue.find(token) != m_queue.end()) { lbajardsilogic@0: m_cancelledRequests[token] = m_queue[token]; lbajardsilogic@0: m_queue.erase(token); lbajardsilogic@0: m_newlyCancelled.insert(token); lbajardsilogic@0: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { lbajardsilogic@0: m_cancelledRequests[token] = m_readyRequests[token]; lbajardsilogic@0: m_readyRequests.erase(token); lbajardsilogic@0: } else { lbajardsilogic@0: std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: lbajardsilogic@0: m_condition.wakeAll(); lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: bool lbajardsilogic@0: FileReadThread::isReady(int token) lbajardsilogic@0: { lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: bool ready = m_readyRequests.find(token) != m_readyRequests.end(); lbajardsilogic@0: lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: return ready; lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: bool lbajardsilogic@0: FileReadThread::isCancelled(int token) lbajardsilogic@0: { lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: bool cancelled = lbajardsilogic@0: m_cancelledRequests.find(token) != m_cancelledRequests.end() && lbajardsilogic@0: m_newlyCancelled.find(token) == m_newlyCancelled.end(); lbajardsilogic@0: lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: return cancelled; lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: bool lbajardsilogic@0: FileReadThread::getRequest(int token, Request &request) lbajardsilogic@0: { lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: bool found = false; lbajardsilogic@0: lbajardsilogic@0: if (m_queue.find(token) != m_queue.end()) { lbajardsilogic@0: request = m_queue[token]; lbajardsilogic@0: found = true; lbajardsilogic@0: } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { lbajardsilogic@0: request = m_cancelledRequests[token]; lbajardsilogic@0: found = true; lbajardsilogic@0: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { lbajardsilogic@0: request = m_readyRequests[token]; lbajardsilogic@0: found = true; lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: lbajardsilogic@0: return found; lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: void lbajardsilogic@0: FileReadThread::done(int token) lbajardsilogic@0: { lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: bool found = false; lbajardsilogic@0: lbajardsilogic@0: if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { lbajardsilogic@0: m_cancelledRequests.erase(token); lbajardsilogic@0: m_newlyCancelled.erase(token); lbajardsilogic@0: found = true; lbajardsilogic@0: } else if (m_readyRequests.find(token) != m_readyRequests.end()) { lbajardsilogic@0: m_readyRequests.erase(token); lbajardsilogic@0: found = true; lbajardsilogic@0: } else if (m_queue.find(token) != m_queue.end()) { lbajardsilogic@0: std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: lbajardsilogic@0: if (!found) { lbajardsilogic@0: std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; lbajardsilogic@0: } lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: void lbajardsilogic@0: FileReadThread::process() lbajardsilogic@0: { lbajardsilogic@0: // entered with m_mutex locked and m_queue non-empty lbajardsilogic@0: lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: Profiler profiler("FileReadThread::process()", true); lbajardsilogic@0: #endif lbajardsilogic@0: lbajardsilogic@0: int token = m_queue.begin()->first; lbajardsilogic@0: Request request = m_queue.begin()->second; lbajardsilogic@0: lbajardsilogic@0: m_mutex.unlock(); lbajardsilogic@0: lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: lbajardsilogic@0: bool successful = false; lbajardsilogic@0: bool seekFailed = false; lbajardsilogic@0: ssize_t r = 0; lbajardsilogic@0: lbajardsilogic@0: if (request.mutex) request.mutex->lock(); lbajardsilogic@0: lbajardsilogic@0: if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { lbajardsilogic@0: seekFailed = true; lbajardsilogic@0: } else { lbajardsilogic@0: lbajardsilogic@0: // if request.size is large, we want to avoid making a single lbajardsilogic@0: // system call to read it all as it may block too much lbajardsilogic@0: lbajardsilogic@0: static const size_t blockSize = 256 * 1024; lbajardsilogic@0: lbajardsilogic@0: size_t size = request.size; lbajardsilogic@0: char *destination = request.data; lbajardsilogic@0: lbajardsilogic@0: while (size > 0) { lbajardsilogic@0: size_t readSize = size; lbajardsilogic@0: if (readSize > blockSize) readSize = blockSize; lbajardsilogic@0: ssize_t br = ::read(request.fd, destination, readSize); lbajardsilogic@0: if (br < 0) { lbajardsilogic@0: r = br; lbajardsilogic@0: break; lbajardsilogic@0: } else { lbajardsilogic@0: r += br; lbajardsilogic@0: if (br < ssize_t(readSize)) break; lbajardsilogic@0: } lbajardsilogic@0: destination += readSize; lbajardsilogic@0: size -= readSize; lbajardsilogic@0: } lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: if (request.mutex) request.mutex->unlock(); lbajardsilogic@0: lbajardsilogic@0: if (seekFailed) { lbajardsilogic@0: ::perror("Seek failed"); lbajardsilogic@0: std::cerr << "ERROR: FileReadThread::process: seek to " lbajardsilogic@0: << request.start << " failed" << std::endl; lbajardsilogic@0: request.size = 0; lbajardsilogic@0: } else { lbajardsilogic@0: if (r < 0) { lbajardsilogic@0: ::perror("ERROR: FileReadThread::process: Read failed"); lbajardsilogic@0: request.size = 0; lbajardsilogic@0: } else if (r < ssize_t(request.size)) { lbajardsilogic@0: std::cerr << "WARNING: FileReadThread::process: read " lbajardsilogic@0: << request.size << " returned only " << r << " bytes" lbajardsilogic@0: << std::endl; lbajardsilogic@0: request.size = r; lbajardsilogic@0: usleep(100000); lbajardsilogic@0: } else { lbajardsilogic@0: successful = true; lbajardsilogic@0: } lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: // Check that the token hasn't been cancelled and the thread lbajardsilogic@0: // hasn't been asked to finish lbajardsilogic@0: lbajardsilogic@0: m_mutex.lock(); lbajardsilogic@0: lbajardsilogic@0: request.successful = successful; lbajardsilogic@0: lbajardsilogic@0: if (m_queue.find(token) != m_queue.end() && !m_exiting) { lbajardsilogic@0: m_queue.erase(token); lbajardsilogic@0: m_readyRequests[token] = request; lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::process: done, marking as ready" << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: } else { lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: } lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: void lbajardsilogic@0: FileReadThread::notifyCancelled() lbajardsilogic@0: { lbajardsilogic@0: // entered with m_mutex locked lbajardsilogic@0: lbajardsilogic@0: while (!m_newlyCancelled.empty()) { lbajardsilogic@0: lbajardsilogic@0: int token = *m_newlyCancelled.begin(); lbajardsilogic@0: lbajardsilogic@0: #ifdef DEBUG_FILE_READ_THREAD lbajardsilogic@0: std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl; lbajardsilogic@0: #endif lbajardsilogic@0: lbajardsilogic@0: m_newlyCancelled.erase(token); lbajardsilogic@0: } lbajardsilogic@0: } lbajardsilogic@0: lbajardsilogic@0: