annotate data/fileio/FileReadThread.cpp @ 282:d9319859a4cf tip

(none)
author benoitrigolleau
date Fri, 31 Oct 2008 11:00:24 +0000
parents fc9323a41f5a
children
rev   line source
lbajardsilogic@0 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
lbajardsilogic@0 2
lbajardsilogic@0 3 /*
lbajardsilogic@0 4 Sonic Visualiser
lbajardsilogic@0 5 An audio file viewer and annotation editor.
lbajardsilogic@0 6 Centre for Digital Music, Queen Mary, University of London.
lbajardsilogic@0 7 This file copyright 2006 Chris Cannam.
lbajardsilogic@0 8
lbajardsilogic@0 9 This program is free software; you can redistribute it and/or
lbajardsilogic@0 10 modify it under the terms of the GNU General Public License as
lbajardsilogic@0 11 published by the Free Software Foundation; either version 2 of the
lbajardsilogic@0 12 License, or (at your option) any later version. See the file
lbajardsilogic@0 13 COPYING included with this distribution for more information.
lbajardsilogic@0 14 */
lbajardsilogic@0 15
lbajardsilogic@0 16 #include "FileReadThread.h"
lbajardsilogic@0 17
lbajardsilogic@0 18 #include "base/Profiler.h"
lbajardsilogic@0 19
lbajardsilogic@0 20 #include <iostream>
lbajardsilogic@0 21 //#include <unistd.h>
lbajardsilogic@0 22
lbajardsilogic@0 23 //#define DEBUG_FILE_READ_THREAD 1
lbajardsilogic@0 24
lbajardsilogic@0 25 FileReadThread::FileReadThread() :
lbajardsilogic@0 26 m_nextToken(0),
lbajardsilogic@0 27 m_exiting(false)
lbajardsilogic@0 28 {
lbajardsilogic@0 29 }
lbajardsilogic@0 30
lbajardsilogic@0 31 void
lbajardsilogic@0 32 FileReadThread::run()
lbajardsilogic@0 33 {
lbajardsilogic@0 34 m_mutex.lock();
lbajardsilogic@0 35
lbajardsilogic@0 36 while (!m_exiting) {
lbajardsilogic@0 37 if (m_queue.empty()) {
lbajardsilogic@0 38 m_condition.wait(&m_mutex, 1000);
lbajardsilogic@0 39 } else {
lbajardsilogic@0 40 process();
lbajardsilogic@0 41 }
lbajardsilogic@0 42 notifyCancelled();
lbajardsilogic@0 43 }
lbajardsilogic@0 44
lbajardsilogic@0 45 notifyCancelled();
lbajardsilogic@0 46 m_mutex.unlock();
lbajardsilogic@0 47
lbajardsilogic@0 48 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 49 std::cerr << "FileReadThread::run() exiting" << std::endl;
lbajardsilogic@0 50 #endif
lbajardsilogic@0 51 }
lbajardsilogic@0 52
lbajardsilogic@0 53 void
lbajardsilogic@0 54 FileReadThread::finish()
lbajardsilogic@0 55 {
lbajardsilogic@0 56 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 57 std::cerr << "FileReadThread::finish()" << std::endl;
lbajardsilogic@0 58 #endif
lbajardsilogic@0 59
lbajardsilogic@0 60 m_mutex.lock();
lbajardsilogic@0 61 while (!m_queue.empty()) {
lbajardsilogic@0 62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
lbajardsilogic@0 63 m_newlyCancelled.insert(m_queue.begin()->first);
lbajardsilogic@0 64 m_queue.erase(m_queue.begin());
lbajardsilogic@0 65 }
lbajardsilogic@0 66
lbajardsilogic@0 67 m_exiting = true;
lbajardsilogic@0 68 m_mutex.unlock();
lbajardsilogic@0 69
lbajardsilogic@0 70 m_condition.wakeAll();
lbajardsilogic@0 71
lbajardsilogic@0 72 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 73 std::cerr << "FileReadThread::finish() exiting" << std::endl;
lbajardsilogic@0 74 #endif
lbajardsilogic@0 75 }
lbajardsilogic@0 76
lbajardsilogic@0 77 int
lbajardsilogic@0 78 FileReadThread::request(const Request &request)
lbajardsilogic@0 79 {
lbajardsilogic@0 80 m_mutex.lock();
lbajardsilogic@0 81
lbajardsilogic@0 82 int token = m_nextToken++;
lbajardsilogic@0 83 m_queue[token] = request;
lbajardsilogic@0 84
lbajardsilogic@0 85 m_mutex.unlock();
lbajardsilogic@0 86 m_condition.wakeAll();
lbajardsilogic@0 87
lbajardsilogic@0 88 return token;
lbajardsilogic@0 89 }
lbajardsilogic@0 90
lbajardsilogic@0 91 void
lbajardsilogic@0 92 FileReadThread::cancel(int token)
lbajardsilogic@0 93 {
lbajardsilogic@0 94 m_mutex.lock();
lbajardsilogic@0 95
lbajardsilogic@0 96 if (m_queue.find(token) != m_queue.end()) {
lbajardsilogic@0 97 m_cancelledRequests[token] = m_queue[token];
lbajardsilogic@0 98 m_queue.erase(token);
lbajardsilogic@0 99 m_newlyCancelled.insert(token);
lbajardsilogic@0 100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
lbajardsilogic@0 101 m_cancelledRequests[token] = m_readyRequests[token];
lbajardsilogic@0 102 m_readyRequests.erase(token);
lbajardsilogic@0 103 } else {
lbajardsilogic@0 104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
lbajardsilogic@0 105 }
lbajardsilogic@0 106
lbajardsilogic@0 107 m_mutex.unlock();
lbajardsilogic@0 108
lbajardsilogic@0 109 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
lbajardsilogic@0 111 #endif
lbajardsilogic@0 112
lbajardsilogic@0 113 m_condition.wakeAll();
lbajardsilogic@0 114 }
lbajardsilogic@0 115
lbajardsilogic@0 116 bool
lbajardsilogic@0 117 FileReadThread::isReady(int token)
lbajardsilogic@0 118 {
lbajardsilogic@0 119 m_mutex.lock();
lbajardsilogic@0 120
lbajardsilogic@0 121 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
lbajardsilogic@0 122
lbajardsilogic@0 123 m_mutex.unlock();
lbajardsilogic@0 124 return ready;
lbajardsilogic@0 125 }
lbajardsilogic@0 126
lbajardsilogic@0 127 bool
lbajardsilogic@0 128 FileReadThread::isCancelled(int token)
lbajardsilogic@0 129 {
lbajardsilogic@0 130 m_mutex.lock();
lbajardsilogic@0 131
lbajardsilogic@0 132 bool cancelled =
lbajardsilogic@0 133 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
lbajardsilogic@0 134 m_newlyCancelled.find(token) == m_newlyCancelled.end();
lbajardsilogic@0 135
lbajardsilogic@0 136 m_mutex.unlock();
lbajardsilogic@0 137 return cancelled;
lbajardsilogic@0 138 }
lbajardsilogic@0 139
lbajardsilogic@0 140 bool
lbajardsilogic@0 141 FileReadThread::getRequest(int token, Request &request)
lbajardsilogic@0 142 {
lbajardsilogic@0 143 m_mutex.lock();
lbajardsilogic@0 144
lbajardsilogic@0 145 bool found = false;
lbajardsilogic@0 146
lbajardsilogic@0 147 if (m_queue.find(token) != m_queue.end()) {
lbajardsilogic@0 148 request = m_queue[token];
lbajardsilogic@0 149 found = true;
lbajardsilogic@0 150 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
lbajardsilogic@0 151 request = m_cancelledRequests[token];
lbajardsilogic@0 152 found = true;
lbajardsilogic@0 153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
lbajardsilogic@0 154 request = m_readyRequests[token];
lbajardsilogic@0 155 found = true;
lbajardsilogic@0 156 }
lbajardsilogic@0 157
lbajardsilogic@0 158 m_mutex.unlock();
lbajardsilogic@0 159
lbajardsilogic@0 160 return found;
lbajardsilogic@0 161 }
lbajardsilogic@0 162
lbajardsilogic@0 163 void
lbajardsilogic@0 164 FileReadThread::done(int token)
lbajardsilogic@0 165 {
lbajardsilogic@0 166 m_mutex.lock();
lbajardsilogic@0 167
lbajardsilogic@0 168 bool found = false;
lbajardsilogic@0 169
lbajardsilogic@0 170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
lbajardsilogic@0 171 m_cancelledRequests.erase(token);
lbajardsilogic@0 172 m_newlyCancelled.erase(token);
lbajardsilogic@0 173 found = true;
lbajardsilogic@0 174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
lbajardsilogic@0 175 m_readyRequests.erase(token);
lbajardsilogic@0 176 found = true;
lbajardsilogic@0 177 } else if (m_queue.find(token) != m_queue.end()) {
lbajardsilogic@0 178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
lbajardsilogic@0 179 }
lbajardsilogic@0 180
lbajardsilogic@0 181 m_mutex.unlock();
lbajardsilogic@0 182
lbajardsilogic@0 183 if (!found) {
lbajardsilogic@0 184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
lbajardsilogic@0 185 }
lbajardsilogic@0 186 }
lbajardsilogic@0 187
lbajardsilogic@0 188 void
lbajardsilogic@0 189 FileReadThread::process()
lbajardsilogic@0 190 {
lbajardsilogic@0 191 // entered with m_mutex locked and m_queue non-empty
lbajardsilogic@0 192
lbajardsilogic@0 193 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 194 Profiler profiler("FileReadThread::process()", true);
lbajardsilogic@0 195 #endif
lbajardsilogic@0 196
lbajardsilogic@0 197 int token = m_queue.begin()->first;
lbajardsilogic@0 198 Request request = m_queue.begin()->second;
lbajardsilogic@0 199
lbajardsilogic@0 200 m_mutex.unlock();
lbajardsilogic@0 201
lbajardsilogic@0 202 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 203 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
lbajardsilogic@0 204 #endif
lbajardsilogic@0 205
lbajardsilogic@0 206 bool successful = false;
lbajardsilogic@0 207 bool seekFailed = false;
lbajardsilogic@0 208 ssize_t r = 0;
lbajardsilogic@0 209
lbajardsilogic@0 210 if (request.mutex) request.mutex->lock();
lbajardsilogic@0 211
lbajardsilogic@0 212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
lbajardsilogic@0 213 seekFailed = true;
lbajardsilogic@0 214 } else {
lbajardsilogic@0 215
lbajardsilogic@0 216 // if request.size is large, we want to avoid making a single
lbajardsilogic@0 217 // system call to read it all as it may block too much
lbajardsilogic@0 218
lbajardsilogic@0 219 static const size_t blockSize = 256 * 1024;
lbajardsilogic@0 220
lbajardsilogic@0 221 size_t size = request.size;
lbajardsilogic@0 222 char *destination = request.data;
lbajardsilogic@0 223
lbajardsilogic@0 224 while (size > 0) {
lbajardsilogic@0 225 size_t readSize = size;
lbajardsilogic@0 226 if (readSize > blockSize) readSize = blockSize;
lbajardsilogic@0 227 ssize_t br = ::read(request.fd, destination, readSize);
lbajardsilogic@0 228 if (br < 0) {
lbajardsilogic@0 229 r = br;
lbajardsilogic@0 230 break;
lbajardsilogic@0 231 } else {
lbajardsilogic@0 232 r += br;
lbajardsilogic@0 233 if (br < ssize_t(readSize)) break;
lbajardsilogic@0 234 }
lbajardsilogic@0 235 destination += readSize;
lbajardsilogic@0 236 size -= readSize;
lbajardsilogic@0 237 }
lbajardsilogic@0 238 }
lbajardsilogic@0 239
lbajardsilogic@0 240 if (request.mutex) request.mutex->unlock();
lbajardsilogic@0 241
lbajardsilogic@0 242 if (seekFailed) {
lbajardsilogic@0 243 ::perror("Seek failed");
lbajardsilogic@0 244 std::cerr << "ERROR: FileReadThread::process: seek to "
lbajardsilogic@0 245 << request.start << " failed" << std::endl;
lbajardsilogic@0 246 request.size = 0;
lbajardsilogic@0 247 } else {
lbajardsilogic@0 248 if (r < 0) {
lbajardsilogic@0 249 ::perror("ERROR: FileReadThread::process: Read failed");
lbajardsilogic@0 250 request.size = 0;
lbajardsilogic@0 251 } else if (r < ssize_t(request.size)) {
lbajardsilogic@0 252 std::cerr << "WARNING: FileReadThread::process: read "
lbajardsilogic@0 253 << request.size << " returned only " << r << " bytes"
lbajardsilogic@0 254 << std::endl;
lbajardsilogic@0 255 request.size = r;
lbajardsilogic@0 256 usleep(100000);
lbajardsilogic@0 257 } else {
lbajardsilogic@0 258 successful = true;
lbajardsilogic@0 259 }
lbajardsilogic@0 260 }
lbajardsilogic@0 261
lbajardsilogic@0 262 // Check that the token hasn't been cancelled and the thread
lbajardsilogic@0 263 // hasn't been asked to finish
lbajardsilogic@0 264
lbajardsilogic@0 265 m_mutex.lock();
lbajardsilogic@0 266
lbajardsilogic@0 267 request.successful = successful;
lbajardsilogic@0 268
lbajardsilogic@0 269 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
lbajardsilogic@0 270 m_queue.erase(token);
lbajardsilogic@0 271 m_readyRequests[token] = request;
lbajardsilogic@0 272 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 273 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
lbajardsilogic@0 274 #endif
lbajardsilogic@0 275 } else {
lbajardsilogic@0 276 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 277 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
lbajardsilogic@0 278 #endif
lbajardsilogic@0 279 }
lbajardsilogic@0 280 }
lbajardsilogic@0 281
lbajardsilogic@0 282 void
lbajardsilogic@0 283 FileReadThread::notifyCancelled()
lbajardsilogic@0 284 {
lbajardsilogic@0 285 // entered with m_mutex locked
lbajardsilogic@0 286
lbajardsilogic@0 287 while (!m_newlyCancelled.empty()) {
lbajardsilogic@0 288
lbajardsilogic@0 289 int token = *m_newlyCancelled.begin();
lbajardsilogic@0 290
lbajardsilogic@0 291 #ifdef DEBUG_FILE_READ_THREAD
lbajardsilogic@0 292 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
lbajardsilogic@0 293 #endif
lbajardsilogic@0 294
lbajardsilogic@0 295 m_newlyCancelled.erase(token);
lbajardsilogic@0 296 }
lbajardsilogic@0 297 }
lbajardsilogic@0 298
lbajardsilogic@0 299