annotate data/fileio/FileReadThread.cpp @ 316:3a6725f285d6

* Make RemoteFile far more pervasive, and use it for local files as well so that we can handle both transparently. Make it shallow copy with reference counting, so it can be used by value without having to worry about the cache file lifetime. Use RemoteFile for MainWindow file-open functions, etc
author Chris Cannam
date Thu, 18 Oct 2007 15:31:20 +0000
parents 1a42221a1522
children 115f60df1e4d
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@148 19
Chris@148 20 #include <iostream>
Chris@148 21 #include <unistd.h>
Chris@148 22
Chris@148 23 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 24
Chris@148 25 FileReadThread::FileReadThread() :
Chris@148 26 m_nextToken(0),
Chris@148 27 m_exiting(false)
Chris@148 28 {
Chris@148 29 }
Chris@148 30
Chris@148 31 void
Chris@148 32 FileReadThread::run()
Chris@148 33 {
Chris@148 34 m_mutex.lock();
Chris@148 35
Chris@148 36 while (!m_exiting) {
Chris@148 37 if (m_queue.empty()) {
Chris@148 38 m_condition.wait(&m_mutex, 1000);
Chris@148 39 } else {
Chris@148 40 process();
Chris@148 41 }
Chris@148 42 notifyCancelled();
Chris@148 43 }
Chris@148 44
Chris@148 45 notifyCancelled();
Chris@148 46 m_mutex.unlock();
Chris@148 47
Chris@148 48 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 49 std::cerr << "FileReadThread::run() exiting" << std::endl;
Chris@148 50 #endif
Chris@148 51 }
Chris@148 52
Chris@148 53 void
Chris@148 54 FileReadThread::finish()
Chris@148 55 {
Chris@148 56 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 57 std::cerr << "FileReadThread::finish()" << std::endl;
Chris@148 58 #endif
Chris@148 59
Chris@148 60 m_mutex.lock();
Chris@148 61 while (!m_queue.empty()) {
Chris@148 62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@148 63 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@148 64 m_queue.erase(m_queue.begin());
Chris@148 65 }
Chris@148 66
Chris@148 67 m_exiting = true;
Chris@148 68 m_mutex.unlock();
Chris@148 69
Chris@148 70 m_condition.wakeAll();
Chris@148 71
Chris@148 72 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 73 std::cerr << "FileReadThread::finish() exiting" << std::endl;
Chris@148 74 #endif
Chris@148 75 }
Chris@148 76
Chris@148 77 int
Chris@148 78 FileReadThread::request(const Request &request)
Chris@148 79 {
Chris@148 80 m_mutex.lock();
Chris@148 81
Chris@148 82 int token = m_nextToken++;
Chris@148 83 m_queue[token] = request;
Chris@148 84
Chris@148 85 m_mutex.unlock();
Chris@148 86 m_condition.wakeAll();
Chris@148 87
Chris@148 88 return token;
Chris@148 89 }
Chris@148 90
Chris@148 91 void
Chris@148 92 FileReadThread::cancel(int token)
Chris@148 93 {
Chris@148 94 m_mutex.lock();
Chris@148 95
Chris@148 96 if (m_queue.find(token) != m_queue.end()) {
Chris@148 97 m_cancelledRequests[token] = m_queue[token];
Chris@148 98 m_queue.erase(token);
Chris@148 99 m_newlyCancelled.insert(token);
Chris@148 100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 101 m_cancelledRequests[token] = m_readyRequests[token];
Chris@148 102 m_readyRequests.erase(token);
Chris@148 103 } else {
Chris@148 104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
Chris@148 105 }
Chris@148 106
Chris@148 107 m_mutex.unlock();
Chris@148 108
Chris@148 109 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
Chris@148 111 #endif
Chris@148 112
Chris@148 113 m_condition.wakeAll();
Chris@148 114 }
Chris@148 115
Chris@148 116 bool
Chris@148 117 FileReadThread::isReady(int token)
Chris@148 118 {
Chris@148 119 m_mutex.lock();
Chris@148 120
Chris@148 121 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 122
Chris@148 123 m_mutex.unlock();
Chris@148 124 return ready;
Chris@148 125 }
Chris@148 126
Chris@148 127 bool
Chris@148 128 FileReadThread::isCancelled(int token)
Chris@148 129 {
Chris@148 130 m_mutex.lock();
Chris@148 131
Chris@148 132 bool cancelled =
Chris@148 133 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 134 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 135
Chris@148 136 m_mutex.unlock();
Chris@148 137 return cancelled;
Chris@148 138 }
Chris@148 139
Chris@148 140 bool
Chris@148 141 FileReadThread::getRequest(int token, Request &request)
Chris@148 142 {
Chris@148 143 m_mutex.lock();
Chris@148 144
Chris@148 145 bool found = false;
Chris@148 146
Chris@148 147 if (m_queue.find(token) != m_queue.end()) {
Chris@148 148 request = m_queue[token];
Chris@148 149 found = true;
Chris@148 150 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 151 request = m_cancelledRequests[token];
Chris@148 152 found = true;
Chris@148 153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 154 request = m_readyRequests[token];
Chris@148 155 found = true;
Chris@148 156 }
Chris@148 157
Chris@148 158 m_mutex.unlock();
Chris@148 159
Chris@148 160 return found;
Chris@148 161 }
Chris@148 162
Chris@148 163 void
Chris@148 164 FileReadThread::done(int token)
Chris@148 165 {
Chris@148 166 m_mutex.lock();
Chris@148 167
Chris@148 168 bool found = false;
Chris@148 169
Chris@148 170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 171 m_cancelledRequests.erase(token);
Chris@148 172 m_newlyCancelled.erase(token);
Chris@148 173 found = true;
Chris@148 174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 175 m_readyRequests.erase(token);
Chris@148 176 found = true;
Chris@148 177 } else if (m_queue.find(token) != m_queue.end()) {
Chris@148 178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
Chris@148 179 }
Chris@148 180
Chris@148 181 m_mutex.unlock();
Chris@148 182
Chris@148 183 if (!found) {
Chris@148 184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
Chris@148 185 }
Chris@148 186 }
Chris@148 187
Chris@148 188 void
Chris@148 189 FileReadThread::process()
Chris@148 190 {
Chris@148 191 // entered with m_mutex locked and m_queue non-empty
Chris@148 192
Chris@148 193 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 194 Profiler profiler("FileReadThread::process()", true);
Chris@148 195 #endif
Chris@148 196
Chris@148 197 int token = m_queue.begin()->first;
Chris@148 198 Request request = m_queue.begin()->second;
Chris@148 199
Chris@148 200 m_mutex.unlock();
Chris@148 201
Chris@148 202 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 203 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
Chris@148 204 #endif
Chris@148 205
Chris@148 206 bool successful = false;
Chris@148 207 bool seekFailed = false;
Chris@148 208 ssize_t r = 0;
Chris@148 209
Chris@148 210 if (request.mutex) request.mutex->lock();
Chris@148 211
Chris@148 212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@148 213 seekFailed = true;
Chris@148 214 } else {
Chris@148 215
Chris@148 216 // if request.size is large, we want to avoid making a single
Chris@148 217 // system call to read it all as it may block too much
Chris@148 218
Chris@148 219 static const size_t blockSize = 256 * 1024;
Chris@148 220
Chris@148 221 size_t size = request.size;
Chris@148 222 char *destination = request.data;
Chris@148 223
Chris@148 224 while (size > 0) {
Chris@148 225 size_t readSize = size;
Chris@148 226 if (readSize > blockSize) readSize = blockSize;
Chris@148 227 ssize_t br = ::read(request.fd, destination, readSize);
Chris@148 228 if (br < 0) {
Chris@148 229 r = br;
Chris@148 230 break;
Chris@148 231 } else {
Chris@148 232 r += br;
Chris@148 233 if (br < ssize_t(readSize)) break;
Chris@148 234 }
Chris@148 235 destination += readSize;
Chris@148 236 size -= readSize;
Chris@148 237 }
Chris@148 238 }
Chris@148 239
Chris@148 240 if (request.mutex) request.mutex->unlock();
Chris@148 241
Chris@148 242 if (seekFailed) {
Chris@148 243 ::perror("Seek failed");
Chris@148 244 std::cerr << "ERROR: FileReadThread::process: seek to "
Chris@148 245 << request.start << " failed" << std::endl;
Chris@148 246 request.size = 0;
Chris@148 247 } else {
Chris@148 248 if (r < 0) {
Chris@148 249 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@148 250 request.size = 0;
Chris@148 251 } else if (r < ssize_t(request.size)) {
Chris@148 252 std::cerr << "WARNING: FileReadThread::process: read "
Chris@148 253 << request.size << " returned only " << r << " bytes"
Chris@148 254 << std::endl;
Chris@148 255 request.size = r;
Chris@148 256 usleep(100000);
Chris@148 257 } else {
Chris@148 258 successful = true;
Chris@148 259 }
Chris@148 260 }
Chris@148 261
Chris@148 262 // Check that the token hasn't been cancelled and the thread
Chris@148 263 // hasn't been asked to finish
Chris@148 264
Chris@148 265 m_mutex.lock();
Chris@148 266
Chris@148 267 request.successful = successful;
Chris@148 268
Chris@148 269 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 270 m_queue.erase(token);
Chris@148 271 m_readyRequests[token] = request;
Chris@148 272 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 273 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
Chris@148 274 #endif
Chris@148 275 } else {
Chris@148 276 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 277 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
Chris@148 278 #endif
Chris@148 279 }
Chris@148 280 }
Chris@148 281
Chris@148 282 void
Chris@148 283 FileReadThread::notifyCancelled()
Chris@148 284 {
Chris@148 285 // entered with m_mutex locked
Chris@148 286
Chris@148 287 while (!m_newlyCancelled.empty()) {
Chris@148 288
Chris@148 289 int token = *m_newlyCancelled.begin();
Chris@148 290
Chris@148 291 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 292 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
Chris@148 293 #endif
Chris@148 294
Chris@148 295 m_newlyCancelled.erase(token);
Chris@148 296 }
Chris@148 297 }
Chris@148 298
Chris@148 299