Mercurial > hg > svcore
diff data/fileio/FileReadThread.cpp @ 148:1a42221a1522
* Reorganising code base. This revision will not compile.
author | Chris Cannam |
---|---|
date | Mon, 31 Jul 2006 11:49:58 +0000 |
parents | |
children | 115f60df1e4d |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/data/fileio/FileReadThread.cpp Mon Jul 31 11:49:58 2006 +0000 @@ -0,0 +1,299 @@ +/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ + +/* + Sonic Visualiser + An audio file viewer and annotation editor. + Centre for Digital Music, Queen Mary, University of London. + This file copyright 2006 Chris Cannam. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of the + License, or (at your option) any later version. See the file + COPYING included with this distribution for more information. +*/ + +#include "FileReadThread.h" + +#include "base/Profiler.h" + +#include <iostream> +#include <unistd.h> + +//#define DEBUG_FILE_READ_THREAD 1 + +FileReadThread::FileReadThread() : + m_nextToken(0), + m_exiting(false) +{ +} + +void +FileReadThread::run() +{ + m_mutex.lock(); + + while (!m_exiting) { + if (m_queue.empty()) { + m_condition.wait(&m_mutex, 1000); + } else { + process(); + } + notifyCancelled(); + } + + notifyCancelled(); + m_mutex.unlock(); + +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::run() exiting" << std::endl; +#endif +} + +void +FileReadThread::finish() +{ +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::finish()" << std::endl; +#endif + + m_mutex.lock(); + while (!m_queue.empty()) { + m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; + m_newlyCancelled.insert(m_queue.begin()->first); + m_queue.erase(m_queue.begin()); + } + + m_exiting = true; + m_mutex.unlock(); + + m_condition.wakeAll(); + +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::finish() exiting" << std::endl; +#endif +} + +int +FileReadThread::request(const Request &request) +{ + m_mutex.lock(); + + int token = m_nextToken++; + m_queue[token] = request; + + m_mutex.unlock(); + m_condition.wakeAll(); + + return token; +} + +void +FileReadThread::cancel(int token) +{ + m_mutex.lock(); + + if (m_queue.find(token) != m_queue.end()) { + m_cancelledRequests[token] = m_queue[token]; + m_queue.erase(token); + m_newlyCancelled.insert(token); + } else if (m_readyRequests.find(token) != m_readyRequests.end()) { + m_cancelledRequests[token] = m_readyRequests[token]; + m_readyRequests.erase(token); + } else { + std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; + } + + m_mutex.unlock(); + +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; +#endif + + m_condition.wakeAll(); +} + +bool +FileReadThread::isReady(int token) +{ + m_mutex.lock(); + + bool ready = m_readyRequests.find(token) != m_readyRequests.end(); + + m_mutex.unlock(); + return ready; +} + +bool +FileReadThread::isCancelled(int token) +{ + m_mutex.lock(); + + bool cancelled = + m_cancelledRequests.find(token) != m_cancelledRequests.end() && + m_newlyCancelled.find(token) == m_newlyCancelled.end(); + + m_mutex.unlock(); + return cancelled; +} + +bool +FileReadThread::getRequest(int token, Request &request) +{ + m_mutex.lock(); + + bool found = false; + + if (m_queue.find(token) != m_queue.end()) { + request = m_queue[token]; + found = true; + } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { + request = m_cancelledRequests[token]; + found = true; + } else if (m_readyRequests.find(token) != m_readyRequests.end()) { + request = m_readyRequests[token]; + found = true; + } + + m_mutex.unlock(); + + return found; +} + +void +FileReadThread::done(int token) +{ + m_mutex.lock(); + + bool found = false; + + if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { + m_cancelledRequests.erase(token); + m_newlyCancelled.erase(token); + found = true; + } else if (m_readyRequests.find(token) != m_readyRequests.end()) { + m_readyRequests.erase(token); + found = true; + } else if (m_queue.find(token) != m_queue.end()) { + std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; + } + + m_mutex.unlock(); + + if (!found) { + std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; + } +} + +void +FileReadThread::process() +{ + // entered with m_mutex locked and m_queue non-empty + +#ifdef DEBUG_FILE_READ_THREAD + Profiler profiler("FileReadThread::process()", true); +#endif + + int token = m_queue.begin()->first; + Request request = m_queue.begin()->second; + + m_mutex.unlock(); + +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl; +#endif + + bool successful = false; + bool seekFailed = false; + ssize_t r = 0; + + if (request.mutex) request.mutex->lock(); + + if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { + seekFailed = true; + } else { + + // if request.size is large, we want to avoid making a single + // system call to read it all as it may block too much + + static const size_t blockSize = 256 * 1024; + + size_t size = request.size; + char *destination = request.data; + + while (size > 0) { + size_t readSize = size; + if (readSize > blockSize) readSize = blockSize; + ssize_t br = ::read(request.fd, destination, readSize); + if (br < 0) { + r = br; + break; + } else { + r += br; + if (br < ssize_t(readSize)) break; + } + destination += readSize; + size -= readSize; + } + } + + if (request.mutex) request.mutex->unlock(); + + if (seekFailed) { + ::perror("Seek failed"); + std::cerr << "ERROR: FileReadThread::process: seek to " + << request.start << " failed" << std::endl; + request.size = 0; + } else { + if (r < 0) { + ::perror("ERROR: FileReadThread::process: Read failed"); + request.size = 0; + } else if (r < ssize_t(request.size)) { + std::cerr << "WARNING: FileReadThread::process: read " + << request.size << " returned only " << r << " bytes" + << std::endl; + request.size = r; + usleep(100000); + } else { + successful = true; + } + } + + // Check that the token hasn't been cancelled and the thread + // hasn't been asked to finish + + m_mutex.lock(); + + request.successful = successful; + + if (m_queue.find(token) != m_queue.end() && !m_exiting) { + m_queue.erase(token); + m_readyRequests[token] = request; +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::process: done, marking as ready" << std::endl; +#endif + } else { +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl; +#endif + } +} + +void +FileReadThread::notifyCancelled() +{ + // entered with m_mutex locked + + while (!m_newlyCancelled.empty()) { + + int token = *m_newlyCancelled.begin(); + +#ifdef DEBUG_FILE_READ_THREAD + std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl; +#endif + + m_newlyCancelled.erase(token); + } +} + +