annotate data/fileio/FileReadThread.cpp @ 512:e7a39c45a7a4

* close output streams a little sooner when writing features for more than one file (doesn't affect eventual output though)
author Chris Cannam
date Tue, 09 Dec 2008 16:53:37 +0000
parents 3e0f1f7bec85
children 29efe322ab47
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@408 19 #include "base/Thread.h"
Chris@148 20
Chris@148 21 #include <iostream>
Chris@148 22 #include <unistd.h>
Chris@148 23
Chris@455 24 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 25
Chris@148 26 FileReadThread::FileReadThread() :
Chris@148 27 m_nextToken(0),
Chris@148 28 m_exiting(false)
Chris@148 29 {
Chris@148 30 }
Chris@148 31
Chris@148 32 void
Chris@148 33 FileReadThread::run()
Chris@148 34 {
Chris@408 35 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148 36
Chris@148 37 while (!m_exiting) {
Chris@148 38 if (m_queue.empty()) {
Chris@148 39 m_condition.wait(&m_mutex, 1000);
Chris@148 40 } else {
Chris@148 41 process();
Chris@148 42 }
Chris@148 43 notifyCancelled();
Chris@148 44 }
Chris@148 45
Chris@148 46 notifyCancelled();
Chris@148 47
Chris@148 48 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 49 std::cerr << "FileReadThread::run() exiting" << std::endl;
Chris@148 50 #endif
Chris@148 51 }
Chris@148 52
Chris@148 53 void
Chris@148 54 FileReadThread::finish()
Chris@148 55 {
Chris@148 56 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 57 std::cerr << "FileReadThread::finish()" << std::endl;
Chris@148 58 #endif
Chris@148 59
Chris@408 60 {
Chris@408 61 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408 62
Chris@408 63 while (!m_queue.empty()) {
Chris@408 64 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408 65 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408 66 m_queue.erase(m_queue.begin());
Chris@408 67 }
Chris@408 68
Chris@408 69 m_exiting = true;
Chris@148 70 }
Chris@148 71
Chris@148 72 m_condition.wakeAll();
Chris@148 73
Chris@148 74 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 75 std::cerr << "FileReadThread::finish() exiting" << std::endl;
Chris@148 76 #endif
Chris@148 77 }
Chris@148 78
Chris@148 79 int
Chris@148 80 FileReadThread::request(const Request &request)
Chris@148 81 {
Chris@408 82 int token;
Chris@408 83
Chris@408 84 {
Chris@408 85 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148 86
Chris@408 87 token = m_nextToken++;
Chris@408 88 m_queue[token] = request;
Chris@408 89 }
Chris@148 90
Chris@148 91 m_condition.wakeAll();
Chris@148 92
Chris@148 93 return token;
Chris@148 94 }
Chris@148 95
Chris@148 96 void
Chris@148 97 FileReadThread::cancel(int token)
Chris@148 98 {
Chris@408 99 {
Chris@408 100 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148 101
Chris@408 102 if (m_queue.find(token) != m_queue.end()) {
Chris@408 103 m_cancelledRequests[token] = m_queue[token];
Chris@408 104 m_queue.erase(token);
Chris@408 105 m_newlyCancelled.insert(token);
Chris@408 106 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408 107 m_cancelledRequests[token] = m_readyRequests[token];
Chris@408 108 m_readyRequests.erase(token);
Chris@408 109 } else {
Chris@408 110 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
Chris@408 111 }
Chris@148 112 }
Chris@148 113
Chris@148 114 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 115 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
Chris@148 116 #endif
Chris@148 117
Chris@148 118 m_condition.wakeAll();
Chris@148 119 }
Chris@148 120
Chris@148 121 bool
Chris@148 122 FileReadThread::isReady(int token)
Chris@148 123 {
Chris@408 124 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148 125
Chris@148 126 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 127
Chris@148 128 return ready;
Chris@148 129 }
Chris@148 130
Chris@148 131 bool
Chris@148 132 FileReadThread::isCancelled(int token)
Chris@148 133 {
Chris@408 134 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148 135
Chris@148 136 bool cancelled =
Chris@148 137 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 138 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 139
Chris@148 140 return cancelled;
Chris@148 141 }
Chris@148 142
Chris@148 143 bool
Chris@455 144 FileReadThread::haveRequest(int token)
Chris@455 145 {
Chris@455 146 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
Chris@455 147
Chris@455 148 bool found = false;
Chris@455 149
Chris@455 150 if (m_queue.find(token) != m_queue.end()) {
Chris@455 151 found = true;
Chris@455 152 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@455 153 found = true;
Chris@455 154 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@455 155 found = true;
Chris@455 156 }
Chris@455 157
Chris@455 158 return found;
Chris@455 159 }
Chris@455 160
Chris@455 161 bool
Chris@148 162 FileReadThread::getRequest(int token, Request &request)
Chris@148 163 {
Chris@408 164 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148 165
Chris@148 166 bool found = false;
Chris@148 167
Chris@148 168 if (m_queue.find(token) != m_queue.end()) {
Chris@148 169 request = m_queue[token];
Chris@148 170 found = true;
Chris@148 171 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 172 request = m_cancelledRequests[token];
Chris@148 173 found = true;
Chris@148 174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 175 request = m_readyRequests[token];
Chris@148 176 found = true;
Chris@148 177 }
Chris@148 178
Chris@148 179 return found;
Chris@148 180 }
Chris@148 181
Chris@148 182 void
Chris@148 183 FileReadThread::done(int token)
Chris@148 184 {
Chris@408 185 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148 186
Chris@148 187 bool found = false;
Chris@148 188
Chris@148 189 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 190 m_cancelledRequests.erase(token);
Chris@148 191 m_newlyCancelled.erase(token);
Chris@148 192 found = true;
Chris@148 193 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 194 m_readyRequests.erase(token);
Chris@148 195 found = true;
Chris@148 196 } else if (m_queue.find(token) != m_queue.end()) {
Chris@148 197 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
Chris@148 198 }
Chris@148 199
Chris@148 200 if (!found) {
Chris@148 201 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
Chris@148 202 }
Chris@148 203 }
Chris@148 204
Chris@148 205 void
Chris@148 206 FileReadThread::process()
Chris@148 207 {
Chris@148 208 // entered with m_mutex locked and m_queue non-empty
Chris@148 209
Chris@408 210 Profiler profiler("FileReadThread::process", true);
Chris@148 211
Chris@148 212 int token = m_queue.begin()->first;
Chris@148 213 Request request = m_queue.begin()->second;
Chris@148 214
Chris@148 215 m_mutex.unlock();
Chris@148 216
Chris@148 217 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 218 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
Chris@148 219 #endif
Chris@148 220
Chris@148 221 bool successful = false;
Chris@148 222 bool seekFailed = false;
Chris@148 223 ssize_t r = 0;
Chris@148 224
Chris@408 225 {
Chris@408 226 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148 227
Chris@408 228 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408 229 seekFailed = true;
Chris@408 230 } else {
Chris@148 231
Chris@408 232 // if request.size is large, we want to avoid making a single
Chris@408 233 // system call to read it all as it may block too much
Chris@408 234
Chris@408 235 static const size_t blockSize = 256 * 1024;
Chris@408 236
Chris@408 237 size_t size = request.size;
Chris@408 238 char *destination = request.data;
Chris@408 239
Chris@408 240 while (size > 0) {
Chris@408 241 size_t readSize = size;
Chris@408 242 if (readSize > blockSize) readSize = blockSize;
Chris@408 243 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408 244 if (br < 0) {
Chris@408 245 r = br;
Chris@408 246 break;
Chris@408 247 } else {
Chris@408 248 r += br;
Chris@408 249 if (br < ssize_t(readSize)) break;
Chris@408 250 }
Chris@408 251 destination += readSize;
Chris@408 252 size -= readSize;
Chris@148 253 }
Chris@148 254 }
Chris@148 255 }
Chris@148 256
Chris@148 257 if (seekFailed) {
Chris@148 258 ::perror("Seek failed");
Chris@148 259 std::cerr << "ERROR: FileReadThread::process: seek to "
Chris@148 260 << request.start << " failed" << std::endl;
Chris@148 261 request.size = 0;
Chris@148 262 } else {
Chris@148 263 if (r < 0) {
Chris@148 264 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@455 265 std::cerr << "ERROR: FileReadThread::process: read of "
Chris@455 266 << request.size << " at "
Chris@455 267 << request.start << " failed" << std::endl;
Chris@148 268 request.size = 0;
Chris@148 269 } else if (r < ssize_t(request.size)) {
Chris@148 270 std::cerr << "WARNING: FileReadThread::process: read "
Chris@148 271 << request.size << " returned only " << r << " bytes"
Chris@148 272 << std::endl;
Chris@148 273 request.size = r;
Chris@148 274 usleep(100000);
Chris@148 275 } else {
Chris@148 276 successful = true;
Chris@148 277 }
Chris@148 278 }
Chris@148 279
Chris@148 280 // Check that the token hasn't been cancelled and the thread
Chris@148 281 // hasn't been asked to finish
Chris@148 282
Chris@148 283 m_mutex.lock();
Chris@148 284
Chris@148 285 request.successful = successful;
Chris@148 286
Chris@148 287 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 288 m_queue.erase(token);
Chris@148 289 m_readyRequests[token] = request;
Chris@148 290 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 291 std::cerr << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << std::endl;
Chris@148 292 #endif
Chris@148 293 } else {
Chris@148 294 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 295 if (m_exiting) {
Chris@455 296 std::cerr << "FileReadThread::process: exiting" << std::endl;
Chris@455 297 } else {
Chris@455 298 std::cerr << "FileReadThread::process: request disappeared" << std::endl;
Chris@455 299 }
Chris@148 300 #endif
Chris@148 301 }
Chris@148 302 }
Chris@148 303
Chris@148 304 void
Chris@148 305 FileReadThread::notifyCancelled()
Chris@148 306 {
Chris@148 307 // entered with m_mutex locked
Chris@148 308
Chris@148 309 while (!m_newlyCancelled.empty()) {
Chris@148 310
Chris@148 311 int token = *m_newlyCancelled.begin();
Chris@148 312
Chris@148 313 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 314 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
Chris@148 315 #endif
Chris@148 316
Chris@148 317 m_newlyCancelled.erase(token);
Chris@148 318 }
Chris@148 319 }
Chris@148 320
Chris@148 321