annotate data/fileio/FileReadThread.cpp @ 329:3179d8b29336

* Another incremental Transform update
author Chris Cannam
date Tue, 06 Nov 2007 17:08:11 +0000
parents 1a42221a1522
children 115f60df1e4d
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@148 19
Chris@148 20 #include <iostream>
Chris@148 21 #include <unistd.h>
Chris@148 22
Chris@148 23 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 24
Chris@148 25 FileReadThread::FileReadThread() :
Chris@148 26 m_nextToken(0),
Chris@148 27 m_exiting(false)
Chris@148 28 {
Chris@148 29 }
Chris@148 30
Chris@148 31 void
Chris@148 32 FileReadThread::run()
Chris@148 33 {
Chris@148 34 m_mutex.lock();
Chris@148 35
Chris@148 36 while (!m_exiting) {
Chris@148 37 if (m_queue.empty()) {
Chris@148 38 m_condition.wait(&m_mutex, 1000);
Chris@148 39 } else {
Chris@148 40 process();
Chris@148 41 }
Chris@148 42 notifyCancelled();
Chris@148 43 }
Chris@148 44
Chris@148 45 notifyCancelled();
Chris@148 46 m_mutex.unlock();
Chris@148 47
Chris@148 48 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 49 std::cerr << "FileReadThread::run() exiting" << std::endl;
Chris@148 50 #endif
Chris@148 51 }
Chris@148 52
Chris@148 53 void
Chris@148 54 FileReadThread::finish()
Chris@148 55 {
Chris@148 56 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 57 std::cerr << "FileReadThread::finish()" << std::endl;
Chris@148 58 #endif
Chris@148 59
Chris@148 60 m_mutex.lock();
Chris@148 61 while (!m_queue.empty()) {
Chris@148 62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@148 63 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@148 64 m_queue.erase(m_queue.begin());
Chris@148 65 }
Chris@148 66
Chris@148 67 m_exiting = true;
Chris@148 68 m_mutex.unlock();
Chris@148 69
Chris@148 70 m_condition.wakeAll();
Chris@148 71
Chris@148 72 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 73 std::cerr << "FileReadThread::finish() exiting" << std::endl;
Chris@148 74 #endif
Chris@148 75 }
Chris@148 76
Chris@148 77 int
Chris@148 78 FileReadThread::request(const Request &request)
Chris@148 79 {
Chris@148 80 m_mutex.lock();
Chris@148 81
Chris@148 82 int token = m_nextToken++;
Chris@148 83 m_queue[token] = request;
Chris@148 84
Chris@148 85 m_mutex.unlock();
Chris@148 86 m_condition.wakeAll();
Chris@148 87
Chris@148 88 return token;
Chris@148 89 }
Chris@148 90
Chris@148 91 void
Chris@148 92 FileReadThread::cancel(int token)
Chris@148 93 {
Chris@148 94 m_mutex.lock();
Chris@148 95
Chris@148 96 if (m_queue.find(token) != m_queue.end()) {
Chris@148 97 m_cancelledRequests[token] = m_queue[token];
Chris@148 98 m_queue.erase(token);
Chris@148 99 m_newlyCancelled.insert(token);
Chris@148 100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 101 m_cancelledRequests[token] = m_readyRequests[token];
Chris@148 102 m_readyRequests.erase(token);
Chris@148 103 } else {
Chris@148 104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
Chris@148 105 }
Chris@148 106
Chris@148 107 m_mutex.unlock();
Chris@148 108
Chris@148 109 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
Chris@148 111 #endif
Chris@148 112
Chris@148 113 m_condition.wakeAll();
Chris@148 114 }
Chris@148 115
Chris@148 116 bool
Chris@148 117 FileReadThread::isReady(int token)
Chris@148 118 {
Chris@148 119 m_mutex.lock();
Chris@148 120
Chris@148 121 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 122
Chris@148 123 m_mutex.unlock();
Chris@148 124 return ready;
Chris@148 125 }
Chris@148 126
Chris@148 127 bool
Chris@148 128 FileReadThread::isCancelled(int token)
Chris@148 129 {
Chris@148 130 m_mutex.lock();
Chris@148 131
Chris@148 132 bool cancelled =
Chris@148 133 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 134 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 135
Chris@148 136 m_mutex.unlock();
Chris@148 137 return cancelled;
Chris@148 138 }
Chris@148 139
Chris@148 140 bool
Chris@148 141 FileReadThread::getRequest(int token, Request &request)
Chris@148 142 {
Chris@148 143 m_mutex.lock();
Chris@148 144
Chris@148 145 bool found = false;
Chris@148 146
Chris@148 147 if (m_queue.find(token) != m_queue.end()) {
Chris@148 148 request = m_queue[token];
Chris@148 149 found = true;
Chris@148 150 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 151 request = m_cancelledRequests[token];
Chris@148 152 found = true;
Chris@148 153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 154 request = m_readyRequests[token];
Chris@148 155 found = true;
Chris@148 156 }
Chris@148 157
Chris@148 158 m_mutex.unlock();
Chris@148 159
Chris@148 160 return found;
Chris@148 161 }
Chris@148 162
Chris@148 163 void
Chris@148 164 FileReadThread::done(int token)
Chris@148 165 {
Chris@148 166 m_mutex.lock();
Chris@148 167
Chris@148 168 bool found = false;
Chris@148 169
Chris@148 170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 171 m_cancelledRequests.erase(token);
Chris@148 172 m_newlyCancelled.erase(token);
Chris@148 173 found = true;
Chris@148 174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 175 m_readyRequests.erase(token);
Chris@148 176 found = true;
Chris@148 177 } else if (m_queue.find(token) != m_queue.end()) {
Chris@148 178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
Chris@148 179 }
Chris@148 180
Chris@148 181 m_mutex.unlock();
Chris@148 182
Chris@148 183 if (!found) {
Chris@148 184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
Chris@148 185 }
Chris@148 186 }
Chris@148 187
Chris@148 188 void
Chris@148 189 FileReadThread::process()
Chris@148 190 {
Chris@148 191 // entered with m_mutex locked and m_queue non-empty
Chris@148 192
Chris@148 193 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 194 Profiler profiler("FileReadThread::process()", true);
Chris@148 195 #endif
Chris@148 196
Chris@148 197 int token = m_queue.begin()->first;
Chris@148 198 Request request = m_queue.begin()->second;
Chris@148 199
Chris@148 200 m_mutex.unlock();
Chris@148 201
Chris@148 202 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 203 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
Chris@148 204 #endif
Chris@148 205
Chris@148 206 bool successful = false;
Chris@148 207 bool seekFailed = false;
Chris@148 208 ssize_t r = 0;
Chris@148 209
Chris@148 210 if (request.mutex) request.mutex->lock();
Chris@148 211
Chris@148 212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@148 213 seekFailed = true;
Chris@148 214 } else {
Chris@148 215
Chris@148 216 // if request.size is large, we want to avoid making a single
Chris@148 217 // system call to read it all as it may block too much
Chris@148 218
Chris@148 219 static const size_t blockSize = 256 * 1024;
Chris@148 220
Chris@148 221 size_t size = request.size;
Chris@148 222 char *destination = request.data;
Chris@148 223
Chris@148 224 while (size > 0) {
Chris@148 225 size_t readSize = size;
Chris@148 226 if (readSize > blockSize) readSize = blockSize;
Chris@148 227 ssize_t br = ::read(request.fd, destination, readSize);
Chris@148 228 if (br < 0) {
Chris@148 229 r = br;
Chris@148 230 break;
Chris@148 231 } else {
Chris@148 232 r += br;
Chris@148 233 if (br < ssize_t(readSize)) break;
Chris@148 234 }
Chris@148 235 destination += readSize;
Chris@148 236 size -= readSize;
Chris@148 237 }
Chris@148 238 }
Chris@148 239
Chris@148 240 if (request.mutex) request.mutex->unlock();
Chris@148 241
Chris@148 242 if (seekFailed) {
Chris@148 243 ::perror("Seek failed");
Chris@148 244 std::cerr << "ERROR: FileReadThread::process: seek to "
Chris@148 245 << request.start << " failed" << std::endl;
Chris@148 246 request.size = 0;
Chris@148 247 } else {
Chris@148 248 if (r < 0) {
Chris@148 249 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@148 250 request.size = 0;
Chris@148 251 } else if (r < ssize_t(request.size)) {
Chris@148 252 std::cerr << "WARNING: FileReadThread::process: read "
Chris@148 253 << request.size << " returned only " << r << " bytes"
Chris@148 254 << std::endl;
Chris@148 255 request.size = r;
Chris@148 256 usleep(100000);
Chris@148 257 } else {
Chris@148 258 successful = true;
Chris@148 259 }
Chris@148 260 }
Chris@148 261
Chris@148 262 // Check that the token hasn't been cancelled and the thread
Chris@148 263 // hasn't been asked to finish
Chris@148 264
Chris@148 265 m_mutex.lock();
Chris@148 266
Chris@148 267 request.successful = successful;
Chris@148 268
Chris@148 269 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 270 m_queue.erase(token);
Chris@148 271 m_readyRequests[token] = request;
Chris@148 272 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 273 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
Chris@148 274 #endif
Chris@148 275 } else {
Chris@148 276 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 277 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
Chris@148 278 #endif
Chris@148 279 }
Chris@148 280 }
Chris@148 281
Chris@148 282 void
Chris@148 283 FileReadThread::notifyCancelled()
Chris@148 284 {
Chris@148 285 // entered with m_mutex locked
Chris@148 286
Chris@148 287 while (!m_newlyCancelled.empty()) {
Chris@148 288
Chris@148 289 int token = *m_newlyCancelled.begin();
Chris@148 290
Chris@148 291 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 292 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
Chris@148 293 #endif
Chris@148 294
Chris@148 295 m_newlyCancelled.erase(token);
Chris@148 296 }
Chris@148 297 }
Chris@148 298
Chris@148 299