diff base/FileReadThread.cpp @ 95:040a151d0897

* Add file reader thread, and make the matrix file code use it to preload fft cache data without glitching
author Chris Cannam
date Thu, 04 May 2006 13:59:57 +0000
parents
children 1aebdc68ec6d
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/base/FileReadThread.cpp	Thu May 04 13:59:57 2006 +0000
@@ -0,0 +1,238 @@
+/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*-  vi:set ts=8 sts=4 sw=4: */
+
+/*
+    Sonic Visualiser
+    An audio file viewer and annotation editor.
+    Centre for Digital Music, Queen Mary, University of London.
+    This file copyright 2006 Chris Cannam.
+    
+    This program is free software; you can redistribute it and/or
+    modify it under the terms of the GNU General Public License as
+    published by the Free Software Foundation; either version 2 of the
+    License, or (at your option) any later version.  See the file
+    COPYING included with this distribution for more information.
+*/
+
+#include "FileReadThread.h"
+
+#include <iostream>
+
+FileReadThread::FileReadThread() :
+    m_nextToken(0),
+    m_exiting(false)
+{
+}
+
+void
+FileReadThread::run()
+{
+    m_mutex.lock();
+
+    while (!m_exiting) {
+        if (m_queue.empty()) {
+            m_condition.wait(&m_mutex, 1000);
+        } else {
+            process();
+            notifyCancelled();
+        }
+    }
+
+    notifyCancelled();
+    m_mutex.unlock();
+
+    std::cerr << "FileReadThread::run() exiting" << std::endl;
+}
+
+void
+FileReadThread::finish()
+{
+    std::cerr << "FileReadThread::finish()" << std::endl;
+
+    m_mutex.lock();
+    while (!m_queue.empty()) {
+        m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
+        m_newlyCancelled.insert(m_queue.begin()->first);
+        m_queue.erase(m_queue.begin());
+    }
+
+    m_exiting = true;
+    m_mutex.unlock();
+
+    m_condition.wakeAll();
+
+    std::cerr << "FileReadThread::finish() exiting" << std::endl;
+}
+
+int
+FileReadThread::request(const Request &request)
+{
+    m_mutex.lock();
+    
+    int token = m_nextToken++;
+    m_queue[token] = request;
+
+    m_mutex.unlock();
+    m_condition.wakeAll();
+
+    return token;
+}
+
+void
+FileReadThread::cancel(int token)
+{
+    m_mutex.lock();
+
+    if (m_queue.find(token) != m_queue.end()) {
+        m_cancelledRequests[token] = m_queue[token];
+        m_queue.erase(token);
+        m_newlyCancelled.insert(token);
+    }
+
+    m_mutex.unlock();
+}
+
+bool
+FileReadThread::isReady(int token)
+{
+    m_mutex.lock();
+
+    bool ready = m_readyRequests.find(token) != m_readyRequests.end();
+
+    m_mutex.unlock();
+    return ready;
+}
+
+bool
+FileReadThread::isCancelled(int token)
+{
+    m_mutex.lock();
+
+    bool cancelled = 
+        m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
+        m_newlyCancelled.find(token) == m_newlyCancelled.end();
+
+    m_mutex.unlock();
+    return cancelled;
+}
+
+bool
+FileReadThread::getRequest(int token, Request &request)
+{
+    m_mutex.lock();
+
+    bool found = false;
+
+    if (m_queue.find(token) != m_queue.end()) {
+        request = m_queue[token];
+        found = true;
+    } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
+        request = m_cancelledRequests[token];
+        found = true;
+    } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
+        request = m_readyRequests[token];
+        found = true;
+    }
+
+    m_mutex.unlock();
+    
+    return found;
+}
+
+void
+FileReadThread::done(int token)
+{
+    m_mutex.lock();
+
+    bool found = false;
+
+    if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
+        m_cancelledRequests.erase(token);
+        m_newlyCancelled.erase(token);
+        found = true;
+    } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
+        m_readyRequests.erase(token);
+        found = true;
+    } else if (m_queue.find(token) != m_queue.end()) {
+        std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
+    }
+
+    m_mutex.unlock();
+
+    if (!found) {
+        std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
+    }
+}
+
+void
+FileReadThread::process()
+{
+    // entered with m_mutex locked and m_queue non-empty
+
+    int token = m_queue.begin()->first;
+    Request request = m_queue.begin()->second;
+
+    m_mutex.unlock();
+
+    std::cerr << "FileReadThread::process: got something to do" << std::endl;
+
+    bool successful = false;
+    bool seekFailed = false;
+    ssize_t r = 0;
+
+    if (request.mutex) request.mutex->lock();
+
+    if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
+        seekFailed = true;
+    } else {
+        r = ::read(request.fd, request.data, request.size);
+    }
+
+    if (request.mutex) request.mutex->unlock();
+
+    if (seekFailed) {
+        ::perror("Seek failed");
+        std::cerr << "ERROR: FileReadThread::process: seek to "
+                  << request.start << " failed" << std::endl;
+        request.size = 0;
+    } else {
+        if (r < 0) {
+            ::perror("ERROR: FileReadThread::process: Read failed");
+            request.size = 0;
+        } else if (r < ssize_t(request.size)) {
+            std::cerr << "WARNING: FileReadThread::process: read "
+                      << request.size << " returned only " << r << " bytes"
+                      << std::endl;
+            request.size = r;
+        } else {
+            successful = true;
+        }
+    }
+        
+    // Check that the token hasn't been cancelled and the thread
+    // hasn't been asked to finish
+    
+    m_mutex.lock();
+        
+    if (m_queue.find(token) != m_queue.end() && !m_exiting) {
+        m_queue.erase(token);
+        m_readyRequests[token] = request;
+        m_mutex.unlock();
+        std::cerr << "emitting" << std::endl;
+        emit ready(token, successful);
+        m_mutex.lock();
+    }
+}
+
+void
+FileReadThread::notifyCancelled()
+{
+    // entered with m_mutex locked
+
+    while (!m_newlyCancelled.empty()) {
+        int token = *m_newlyCancelled.begin();
+        m_newlyCancelled.erase(token);
+        emit cancelled(token);
+    }
+}
+        
+