annotate data/fileio/FileReadThread.cpp @ 450:d8a2c28ba9f6

* Query range before time (in case time component of range turns out to be synonymous with time component of time)
author Chris Cannam
date Tue, 07 Oct 2008 12:59:55 +0000
parents 115f60df1e4d
children 1f15beefcd76
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@408 19 #include "base/Thread.h"
Chris@148 20
Chris@148 21 #include <iostream>
Chris@148 22 #include <unistd.h>
Chris@148 23
Chris@148 24 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 25
Chris@148 26 FileReadThread::FileReadThread() :
Chris@148 27 m_nextToken(0),
Chris@148 28 m_exiting(false)
Chris@148 29 {
Chris@148 30 }
Chris@148 31
Chris@148 32 void
Chris@148 33 FileReadThread::run()
Chris@148 34 {
Chris@408 35 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148 36
Chris@148 37 while (!m_exiting) {
Chris@148 38 if (m_queue.empty()) {
Chris@148 39 m_condition.wait(&m_mutex, 1000);
Chris@148 40 } else {
Chris@148 41 process();
Chris@148 42 }
Chris@148 43 notifyCancelled();
Chris@148 44 }
Chris@148 45
Chris@148 46 notifyCancelled();
Chris@148 47
Chris@148 48 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 49 std::cerr << "FileReadThread::run() exiting" << std::endl;
Chris@148 50 #endif
Chris@148 51 }
Chris@148 52
Chris@148 53 void
Chris@148 54 FileReadThread::finish()
Chris@148 55 {
Chris@148 56 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 57 std::cerr << "FileReadThread::finish()" << std::endl;
Chris@148 58 #endif
Chris@148 59
Chris@408 60 {
Chris@408 61 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408 62
Chris@408 63 while (!m_queue.empty()) {
Chris@408 64 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408 65 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408 66 m_queue.erase(m_queue.begin());
Chris@408 67 }
Chris@408 68
Chris@408 69 m_exiting = true;
Chris@148 70 }
Chris@148 71
Chris@148 72 m_condition.wakeAll();
Chris@148 73
Chris@148 74 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 75 std::cerr << "FileReadThread::finish() exiting" << std::endl;
Chris@148 76 #endif
Chris@148 77 }
Chris@148 78
Chris@148 79 int
Chris@148 80 FileReadThread::request(const Request &request)
Chris@148 81 {
Chris@408 82 int token;
Chris@408 83
Chris@408 84 {
Chris@408 85 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148 86
Chris@408 87 token = m_nextToken++;
Chris@408 88 m_queue[token] = request;
Chris@408 89 }
Chris@148 90
Chris@148 91 m_condition.wakeAll();
Chris@148 92
Chris@148 93 return token;
Chris@148 94 }
Chris@148 95
Chris@148 96 void
Chris@148 97 FileReadThread::cancel(int token)
Chris@148 98 {
Chris@408 99 {
Chris@408 100 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148 101
Chris@408 102 if (m_queue.find(token) != m_queue.end()) {
Chris@408 103 m_cancelledRequests[token] = m_queue[token];
Chris@408 104 m_queue.erase(token);
Chris@408 105 m_newlyCancelled.insert(token);
Chris@408 106 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408 107 m_cancelledRequests[token] = m_readyRequests[token];
Chris@408 108 m_readyRequests.erase(token);
Chris@408 109 } else {
Chris@408 110 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
Chris@408 111 }
Chris@148 112 }
Chris@148 113
Chris@148 114 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 115 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
Chris@148 116 #endif
Chris@148 117
Chris@148 118 m_condition.wakeAll();
Chris@148 119 }
Chris@148 120
Chris@148 121 bool
Chris@148 122 FileReadThread::isReady(int token)
Chris@148 123 {
Chris@408 124 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148 125
Chris@148 126 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 127
Chris@148 128 return ready;
Chris@148 129 }
Chris@148 130
Chris@148 131 bool
Chris@148 132 FileReadThread::isCancelled(int token)
Chris@148 133 {
Chris@408 134 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148 135
Chris@148 136 bool cancelled =
Chris@148 137 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 138 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 139
Chris@148 140 return cancelled;
Chris@148 141 }
Chris@148 142
Chris@148 143 bool
Chris@148 144 FileReadThread::getRequest(int token, Request &request)
Chris@148 145 {
Chris@408 146 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148 147
Chris@148 148 bool found = false;
Chris@148 149
Chris@148 150 if (m_queue.find(token) != m_queue.end()) {
Chris@148 151 request = m_queue[token];
Chris@148 152 found = true;
Chris@148 153 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 154 request = m_cancelledRequests[token];
Chris@148 155 found = true;
Chris@148 156 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 157 request = m_readyRequests[token];
Chris@148 158 found = true;
Chris@148 159 }
Chris@148 160
Chris@148 161 return found;
Chris@148 162 }
Chris@148 163
Chris@148 164 void
Chris@148 165 FileReadThread::done(int token)
Chris@148 166 {
Chris@408 167 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148 168
Chris@148 169 bool found = false;
Chris@148 170
Chris@148 171 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 172 m_cancelledRequests.erase(token);
Chris@148 173 m_newlyCancelled.erase(token);
Chris@148 174 found = true;
Chris@148 175 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 176 m_readyRequests.erase(token);
Chris@148 177 found = true;
Chris@148 178 } else if (m_queue.find(token) != m_queue.end()) {
Chris@148 179 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
Chris@148 180 }
Chris@148 181
Chris@148 182 if (!found) {
Chris@148 183 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
Chris@148 184 }
Chris@148 185 }
Chris@148 186
Chris@148 187 void
Chris@148 188 FileReadThread::process()
Chris@148 189 {
Chris@148 190 // entered with m_mutex locked and m_queue non-empty
Chris@148 191
Chris@408 192 Profiler profiler("FileReadThread::process", true);
Chris@148 193
Chris@148 194 int token = m_queue.begin()->first;
Chris@148 195 Request request = m_queue.begin()->second;
Chris@148 196
Chris@148 197 m_mutex.unlock();
Chris@148 198
Chris@148 199 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 200 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
Chris@148 201 #endif
Chris@148 202
Chris@148 203 bool successful = false;
Chris@148 204 bool seekFailed = false;
Chris@148 205 ssize_t r = 0;
Chris@148 206
Chris@408 207 {
Chris@408 208 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148 209
Chris@408 210 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408 211 seekFailed = true;
Chris@408 212 } else {
Chris@148 213
Chris@408 214 // if request.size is large, we want to avoid making a single
Chris@408 215 // system call to read it all as it may block too much
Chris@408 216
Chris@408 217 static const size_t blockSize = 256 * 1024;
Chris@408 218
Chris@408 219 size_t size = request.size;
Chris@408 220 char *destination = request.data;
Chris@408 221
Chris@408 222 while (size > 0) {
Chris@408 223 size_t readSize = size;
Chris@408 224 if (readSize > blockSize) readSize = blockSize;
Chris@408 225 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408 226 if (br < 0) {
Chris@408 227 r = br;
Chris@408 228 break;
Chris@408 229 } else {
Chris@408 230 r += br;
Chris@408 231 if (br < ssize_t(readSize)) break;
Chris@408 232 }
Chris@408 233 destination += readSize;
Chris@408 234 size -= readSize;
Chris@148 235 }
Chris@148 236 }
Chris@148 237 }
Chris@148 238
Chris@148 239 if (seekFailed) {
Chris@148 240 ::perror("Seek failed");
Chris@148 241 std::cerr << "ERROR: FileReadThread::process: seek to "
Chris@148 242 << request.start << " failed" << std::endl;
Chris@148 243 request.size = 0;
Chris@148 244 } else {
Chris@148 245 if (r < 0) {
Chris@148 246 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@148 247 request.size = 0;
Chris@148 248 } else if (r < ssize_t(request.size)) {
Chris@148 249 std::cerr << "WARNING: FileReadThread::process: read "
Chris@148 250 << request.size << " returned only " << r << " bytes"
Chris@148 251 << std::endl;
Chris@148 252 request.size = r;
Chris@148 253 usleep(100000);
Chris@148 254 } else {
Chris@148 255 successful = true;
Chris@148 256 }
Chris@148 257 }
Chris@148 258
Chris@148 259 // Check that the token hasn't been cancelled and the thread
Chris@148 260 // hasn't been asked to finish
Chris@148 261
Chris@148 262 m_mutex.lock();
Chris@148 263
Chris@148 264 request.successful = successful;
Chris@148 265
Chris@148 266 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 267 m_queue.erase(token);
Chris@148 268 m_readyRequests[token] = request;
Chris@148 269 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 270 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
Chris@148 271 #endif
Chris@148 272 } else {
Chris@148 273 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 274 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
Chris@148 275 #endif
Chris@148 276 }
Chris@148 277 }
Chris@148 278
Chris@148 279 void
Chris@148 280 FileReadThread::notifyCancelled()
Chris@148 281 {
Chris@148 282 // entered with m_mutex locked
Chris@148 283
Chris@148 284 while (!m_newlyCancelled.empty()) {
Chris@148 285
Chris@148 286 int token = *m_newlyCancelled.begin();
Chris@148 287
Chris@148 288 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 289 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
Chris@148 290 #endif
Chris@148 291
Chris@148 292 m_newlyCancelled.erase(token);
Chris@148 293 }
Chris@148 294 }
Chris@148 295
Chris@148 296