Mercurial > hg > svcore
diff base/FileReadThread.cpp @ 95:040a151d0897
* Add file reader thread, and make the matrix file code use it to preload
fft cache data without glitching
author | Chris Cannam |
---|---|
date | Thu, 04 May 2006 13:59:57 +0000 |
parents | |
children | 1aebdc68ec6d |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/base/FileReadThread.cpp Thu May 04 13:59:57 2006 +0000 @@ -0,0 +1,238 @@ +/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ + +/* + Sonic Visualiser + An audio file viewer and annotation editor. + Centre for Digital Music, Queen Mary, University of London. + This file copyright 2006 Chris Cannam. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of the + License, or (at your option) any later version. See the file + COPYING included with this distribution for more information. +*/ + +#include "FileReadThread.h" + +#include <iostream> + +FileReadThread::FileReadThread() : + m_nextToken(0), + m_exiting(false) +{ +} + +void +FileReadThread::run() +{ + m_mutex.lock(); + + while (!m_exiting) { + if (m_queue.empty()) { + m_condition.wait(&m_mutex, 1000); + } else { + process(); + notifyCancelled(); + } + } + + notifyCancelled(); + m_mutex.unlock(); + + std::cerr << "FileReadThread::run() exiting" << std::endl; +} + +void +FileReadThread::finish() +{ + std::cerr << "FileReadThread::finish()" << std::endl; + + m_mutex.lock(); + while (!m_queue.empty()) { + m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; + m_newlyCancelled.insert(m_queue.begin()->first); + m_queue.erase(m_queue.begin()); + } + + m_exiting = true; + m_mutex.unlock(); + + m_condition.wakeAll(); + + std::cerr << "FileReadThread::finish() exiting" << std::endl; +} + +int +FileReadThread::request(const Request &request) +{ + m_mutex.lock(); + + int token = m_nextToken++; + m_queue[token] = request; + + m_mutex.unlock(); + m_condition.wakeAll(); + + return token; +} + +void +FileReadThread::cancel(int token) +{ + m_mutex.lock(); + + if (m_queue.find(token) != m_queue.end()) { + m_cancelledRequests[token] = m_queue[token]; + m_queue.erase(token); + m_newlyCancelled.insert(token); + } + + m_mutex.unlock(); +} + +bool +FileReadThread::isReady(int token) +{ + m_mutex.lock(); + + bool ready = m_readyRequests.find(token) != m_readyRequests.end(); + + m_mutex.unlock(); + return ready; +} + +bool +FileReadThread::isCancelled(int token) +{ + m_mutex.lock(); + + bool cancelled = + m_cancelledRequests.find(token) != m_cancelledRequests.end() && + m_newlyCancelled.find(token) == m_newlyCancelled.end(); + + m_mutex.unlock(); + return cancelled; +} + +bool +FileReadThread::getRequest(int token, Request &request) +{ + m_mutex.lock(); + + bool found = false; + + if (m_queue.find(token) != m_queue.end()) { + request = m_queue[token]; + found = true; + } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { + request = m_cancelledRequests[token]; + found = true; + } else if (m_readyRequests.find(token) != m_readyRequests.end()) { + request = m_readyRequests[token]; + found = true; + } + + m_mutex.unlock(); + + return found; +} + +void +FileReadThread::done(int token) +{ + m_mutex.lock(); + + bool found = false; + + if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { + m_cancelledRequests.erase(token); + m_newlyCancelled.erase(token); + found = true; + } else if (m_readyRequests.find(token) != m_readyRequests.end()) { + m_readyRequests.erase(token); + found = true; + } else if (m_queue.find(token) != m_queue.end()) { + std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; + } + + m_mutex.unlock(); + + if (!found) { + std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; + } +} + +void +FileReadThread::process() +{ + // entered with m_mutex locked and m_queue non-empty + + int token = m_queue.begin()->first; + Request request = m_queue.begin()->second; + + m_mutex.unlock(); + + std::cerr << "FileReadThread::process: got something to do" << std::endl; + + bool successful = false; + bool seekFailed = false; + ssize_t r = 0; + + if (request.mutex) request.mutex->lock(); + + if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { + seekFailed = true; + } else { + r = ::read(request.fd, request.data, request.size); + } + + if (request.mutex) request.mutex->unlock(); + + if (seekFailed) { + ::perror("Seek failed"); + std::cerr << "ERROR: FileReadThread::process: seek to " + << request.start << " failed" << std::endl; + request.size = 0; + } else { + if (r < 0) { + ::perror("ERROR: FileReadThread::process: Read failed"); + request.size = 0; + } else if (r < ssize_t(request.size)) { + std::cerr << "WARNING: FileReadThread::process: read " + << request.size << " returned only " << r << " bytes" + << std::endl; + request.size = r; + } else { + successful = true; + } + } + + // Check that the token hasn't been cancelled and the thread + // hasn't been asked to finish + + m_mutex.lock(); + + if (m_queue.find(token) != m_queue.end() && !m_exiting) { + m_queue.erase(token); + m_readyRequests[token] = request; + m_mutex.unlock(); + std::cerr << "emitting" << std::endl; + emit ready(token, successful); + m_mutex.lock(); + } +} + +void +FileReadThread::notifyCancelled() +{ + // entered with m_mutex locked + + while (!m_newlyCancelled.empty()) { + int token = *m_newlyCancelled.begin(); + m_newlyCancelled.erase(token); + emit cancelled(token); + } +} + +