annotate base/FileReadThread.cpp @ 96:1aebdc68ec6d

* Introduce simple non-RT thread base class * Rename MatrixFileCache to MatrixFile * some fixes & tidying
author Chris Cannam
date Thu, 04 May 2006 16:03:02 +0000
parents 040a151d0897
children 2b1a16e38d2d
rev   line source
Chris@95 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@95 2
Chris@95 3 /*
Chris@95 4 Sonic Visualiser
Chris@95 5 An audio file viewer and annotation editor.
Chris@95 6 Centre for Digital Music, Queen Mary, University of London.
Chris@95 7 This file copyright 2006 Chris Cannam.
Chris@95 8
Chris@95 9 This program is free software; you can redistribute it and/or
Chris@95 10 modify it under the terms of the GNU General Public License as
Chris@95 11 published by the Free Software Foundation; either version 2 of the
Chris@95 12 License, or (at your option) any later version. See the file
Chris@95 13 COPYING included with this distribution for more information.
Chris@95 14 */
Chris@95 15
Chris@95 16 #include "FileReadThread.h"
Chris@95 17
Chris@95 18 #include <iostream>
Chris@95 19
Chris@95 20 FileReadThread::FileReadThread() :
Chris@95 21 m_nextToken(0),
Chris@95 22 m_exiting(false)
Chris@95 23 {
Chris@95 24 }
Chris@95 25
Chris@95 26 void
Chris@95 27 FileReadThread::run()
Chris@95 28 {
Chris@95 29 m_mutex.lock();
Chris@95 30
Chris@95 31 while (!m_exiting) {
Chris@95 32 if (m_queue.empty()) {
Chris@95 33 m_condition.wait(&m_mutex, 1000);
Chris@95 34 } else {
Chris@95 35 process();
Chris@95 36 }
Chris@96 37 notifyCancelled();
Chris@95 38 }
Chris@95 39
Chris@95 40 notifyCancelled();
Chris@95 41 m_mutex.unlock();
Chris@95 42
Chris@95 43 std::cerr << "FileReadThread::run() exiting" << std::endl;
Chris@95 44 }
Chris@95 45
Chris@95 46 void
Chris@95 47 FileReadThread::finish()
Chris@95 48 {
Chris@95 49 std::cerr << "FileReadThread::finish()" << std::endl;
Chris@95 50
Chris@95 51 m_mutex.lock();
Chris@95 52 while (!m_queue.empty()) {
Chris@95 53 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@95 54 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@95 55 m_queue.erase(m_queue.begin());
Chris@95 56 }
Chris@95 57
Chris@95 58 m_exiting = true;
Chris@95 59 m_mutex.unlock();
Chris@95 60
Chris@95 61 m_condition.wakeAll();
Chris@95 62
Chris@95 63 std::cerr << "FileReadThread::finish() exiting" << std::endl;
Chris@95 64 }
Chris@95 65
Chris@95 66 int
Chris@95 67 FileReadThread::request(const Request &request)
Chris@95 68 {
Chris@95 69 m_mutex.lock();
Chris@95 70
Chris@95 71 int token = m_nextToken++;
Chris@95 72 m_queue[token] = request;
Chris@95 73
Chris@95 74 m_mutex.unlock();
Chris@95 75 m_condition.wakeAll();
Chris@95 76
Chris@95 77 return token;
Chris@95 78 }
Chris@95 79
Chris@95 80 void
Chris@95 81 FileReadThread::cancel(int token)
Chris@95 82 {
Chris@95 83 m_mutex.lock();
Chris@95 84
Chris@95 85 if (m_queue.find(token) != m_queue.end()) {
Chris@95 86 m_cancelledRequests[token] = m_queue[token];
Chris@95 87 m_queue.erase(token);
Chris@95 88 m_newlyCancelled.insert(token);
Chris@96 89 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@96 90 m_cancelledRequests[token] = m_readyRequests[token];
Chris@96 91 m_readyRequests.erase(token);
Chris@96 92 } else {
Chris@96 93 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
Chris@95 94 }
Chris@95 95
Chris@95 96 m_mutex.unlock();
Chris@96 97
Chris@96 98 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
Chris@96 99
Chris@96 100 m_condition.wakeAll();
Chris@95 101 }
Chris@95 102
Chris@95 103 bool
Chris@95 104 FileReadThread::isReady(int token)
Chris@95 105 {
Chris@95 106 m_mutex.lock();
Chris@95 107
Chris@95 108 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@95 109
Chris@95 110 m_mutex.unlock();
Chris@95 111 return ready;
Chris@95 112 }
Chris@95 113
Chris@95 114 bool
Chris@95 115 FileReadThread::isCancelled(int token)
Chris@95 116 {
Chris@95 117 m_mutex.lock();
Chris@95 118
Chris@95 119 bool cancelled =
Chris@95 120 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@95 121 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@95 122
Chris@95 123 m_mutex.unlock();
Chris@95 124 return cancelled;
Chris@95 125 }
Chris@95 126
Chris@95 127 bool
Chris@95 128 FileReadThread::getRequest(int token, Request &request)
Chris@95 129 {
Chris@95 130 m_mutex.lock();
Chris@95 131
Chris@95 132 bool found = false;
Chris@95 133
Chris@95 134 if (m_queue.find(token) != m_queue.end()) {
Chris@95 135 request = m_queue[token];
Chris@95 136 found = true;
Chris@95 137 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@95 138 request = m_cancelledRequests[token];
Chris@95 139 found = true;
Chris@95 140 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@95 141 request = m_readyRequests[token];
Chris@95 142 found = true;
Chris@95 143 }
Chris@95 144
Chris@95 145 m_mutex.unlock();
Chris@95 146
Chris@95 147 return found;
Chris@95 148 }
Chris@95 149
Chris@95 150 void
Chris@95 151 FileReadThread::done(int token)
Chris@95 152 {
Chris@95 153 m_mutex.lock();
Chris@95 154
Chris@95 155 bool found = false;
Chris@95 156
Chris@95 157 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@95 158 m_cancelledRequests.erase(token);
Chris@95 159 m_newlyCancelled.erase(token);
Chris@95 160 found = true;
Chris@95 161 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@95 162 m_readyRequests.erase(token);
Chris@95 163 found = true;
Chris@95 164 } else if (m_queue.find(token) != m_queue.end()) {
Chris@95 165 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
Chris@95 166 }
Chris@95 167
Chris@95 168 m_mutex.unlock();
Chris@95 169
Chris@95 170 if (!found) {
Chris@95 171 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
Chris@95 172 }
Chris@95 173 }
Chris@95 174
Chris@95 175 void
Chris@95 176 FileReadThread::process()
Chris@95 177 {
Chris@95 178 // entered with m_mutex locked and m_queue non-empty
Chris@95 179
Chris@95 180 int token = m_queue.begin()->first;
Chris@95 181 Request request = m_queue.begin()->second;
Chris@95 182
Chris@95 183 m_mutex.unlock();
Chris@95 184
Chris@95 185 std::cerr << "FileReadThread::process: got something to do" << std::endl;
Chris@95 186
Chris@95 187 bool successful = false;
Chris@95 188 bool seekFailed = false;
Chris@95 189 ssize_t r = 0;
Chris@95 190
Chris@95 191 if (request.mutex) request.mutex->lock();
Chris@95 192
Chris@95 193 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@95 194 seekFailed = true;
Chris@95 195 } else {
Chris@96 196
Chris@96 197 // if request.size is large, we want to avoid making a single
Chris@96 198 // system call to read it all as it may block too much
Chris@96 199
Chris@96 200 static const size_t blockSize = 16384;
Chris@96 201
Chris@96 202 size_t size = request.size;
Chris@96 203 char *destination = request.data;
Chris@96 204
Chris@96 205 while (size > 0) {
Chris@96 206 size_t readSize = size;
Chris@96 207 if (readSize > blockSize) readSize = blockSize;
Chris@96 208 ssize_t br = ::read(request.fd, destination, readSize);
Chris@96 209 if (br < 0) {
Chris@96 210 r = br;
Chris@96 211 break;
Chris@96 212 } else {
Chris@96 213 r += br;
Chris@96 214 if (br < ssize_t(readSize)) break;
Chris@96 215 }
Chris@96 216 destination += readSize;
Chris@96 217 size -= readSize;
Chris@96 218 }
Chris@95 219 }
Chris@95 220
Chris@95 221 if (request.mutex) request.mutex->unlock();
Chris@95 222
Chris@95 223 if (seekFailed) {
Chris@95 224 ::perror("Seek failed");
Chris@95 225 std::cerr << "ERROR: FileReadThread::process: seek to "
Chris@95 226 << request.start << " failed" << std::endl;
Chris@95 227 request.size = 0;
Chris@95 228 } else {
Chris@95 229 if (r < 0) {
Chris@95 230 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@95 231 request.size = 0;
Chris@95 232 } else if (r < ssize_t(request.size)) {
Chris@95 233 std::cerr << "WARNING: FileReadThread::process: read "
Chris@95 234 << request.size << " returned only " << r << " bytes"
Chris@95 235 << std::endl;
Chris@95 236 request.size = r;
Chris@95 237 } else {
Chris@95 238 successful = true;
Chris@95 239 }
Chris@95 240 }
Chris@95 241
Chris@95 242 // Check that the token hasn't been cancelled and the thread
Chris@95 243 // hasn't been asked to finish
Chris@95 244
Chris@95 245 m_mutex.lock();
Chris@95 246
Chris@95 247 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@95 248 m_queue.erase(token);
Chris@95 249 m_readyRequests[token] = request;
Chris@95 250 m_mutex.unlock();
Chris@95 251 std::cerr << "emitting" << std::endl;
Chris@95 252 emit ready(token, successful);
Chris@95 253 m_mutex.lock();
Chris@95 254 }
Chris@95 255 }
Chris@95 256
Chris@95 257 void
Chris@95 258 FileReadThread::notifyCancelled()
Chris@95 259 {
Chris@95 260 // entered with m_mutex locked
Chris@95 261
Chris@95 262 while (!m_newlyCancelled.empty()) {
Chris@96 263
Chris@95 264 int token = *m_newlyCancelled.begin();
Chris@96 265
Chris@96 266 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
Chris@96 267
Chris@95 268 m_newlyCancelled.erase(token);
Chris@95 269 emit cancelled(token);
Chris@95 270 }
Chris@95 271 }
Chris@95 272
Chris@95 273