annotate data/fileio/FileReadThread.cpp @ 993:60e2927b1475

Disconnect signals before abandoning reply object, avoiding "reply unknown" error
author Chris Cannam
date Wed, 08 Oct 2014 13:14:17 +0100
parents e802e550a1f2
children c811991a5efa
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@408 19 #include "base/Thread.h"
Chris@148 20
Chris@148 21 #include <iostream>
Chris@148 22 #include <unistd.h>
Chris@658 23 #include <cstdio>
Chris@148 24
Chris@455 25 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 26
Chris@148 27 FileReadThread::FileReadThread() :
Chris@148 28 m_nextToken(0),
Chris@148 29 m_exiting(false)
Chris@148 30 {
Chris@148 31 }
Chris@148 32
Chris@148 33 void
Chris@148 34 FileReadThread::run()
Chris@148 35 {
Chris@408 36 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148 37
Chris@148 38 while (!m_exiting) {
Chris@148 39 if (m_queue.empty()) {
Chris@148 40 m_condition.wait(&m_mutex, 1000);
Chris@148 41 } else {
Chris@148 42 process();
Chris@148 43 }
Chris@148 44 notifyCancelled();
Chris@148 45 }
Chris@148 46
Chris@148 47 notifyCancelled();
Chris@148 48
Chris@148 49 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 50 SVDEBUG << "FileReadThread::run() exiting" << endl;
Chris@148 51 #endif
Chris@148 52 }
Chris@148 53
Chris@148 54 void
Chris@148 55 FileReadThread::finish()
Chris@148 56 {
Chris@148 57 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 58 SVDEBUG << "FileReadThread::finish()" << endl;
Chris@148 59 #endif
Chris@148 60
Chris@408 61 {
Chris@408 62 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408 63
Chris@408 64 while (!m_queue.empty()) {
Chris@408 65 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408 66 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408 67 m_queue.erase(m_queue.begin());
Chris@408 68 }
Chris@408 69
Chris@408 70 m_exiting = true;
Chris@148 71 }
Chris@148 72
Chris@148 73 m_condition.wakeAll();
Chris@148 74
Chris@148 75 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 76 SVDEBUG << "FileReadThread::finish() exiting" << endl;
Chris@148 77 #endif
Chris@148 78 }
Chris@148 79
Chris@148 80 int
Chris@148 81 FileReadThread::request(const Request &request)
Chris@148 82 {
Chris@408 83 int token;
Chris@408 84
Chris@408 85 {
Chris@408 86 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148 87
Chris@408 88 token = m_nextToken++;
Chris@408 89 m_queue[token] = request;
Chris@408 90 }
Chris@148 91
Chris@148 92 m_condition.wakeAll();
Chris@148 93
Chris@148 94 return token;
Chris@148 95 }
Chris@148 96
Chris@148 97 void
Chris@148 98 FileReadThread::cancel(int token)
Chris@148 99 {
Chris@408 100 {
Chris@408 101 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148 102
Chris@408 103 if (m_queue.find(token) != m_queue.end()) {
Chris@408 104 m_cancelledRequests[token] = m_queue[token];
Chris@408 105 m_queue.erase(token);
Chris@408 106 m_newlyCancelled.insert(token);
Chris@408 107 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408 108 m_cancelledRequests[token] = m_readyRequests[token];
Chris@408 109 m_readyRequests.erase(token);
Chris@408 110 } else {
Chris@843 111 cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
Chris@408 112 }
Chris@148 113 }
Chris@148 114
Chris@148 115 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 116 SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
Chris@148 117 #endif
Chris@148 118
Chris@148 119 m_condition.wakeAll();
Chris@148 120 }
Chris@148 121
Chris@148 122 bool
Chris@148 123 FileReadThread::isReady(int token)
Chris@148 124 {
Chris@408 125 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148 126
Chris@148 127 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 128
Chris@148 129 return ready;
Chris@148 130 }
Chris@148 131
Chris@148 132 bool
Chris@148 133 FileReadThread::isCancelled(int token)
Chris@148 134 {
Chris@408 135 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148 136
Chris@148 137 bool cancelled =
Chris@148 138 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 139 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 140
Chris@148 141 return cancelled;
Chris@148 142 }
Chris@148 143
Chris@148 144 bool
Chris@455 145 FileReadThread::haveRequest(int token)
Chris@455 146 {
Chris@455 147 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
Chris@455 148
Chris@455 149 bool found = false;
Chris@455 150
Chris@455 151 if (m_queue.find(token) != m_queue.end()) {
Chris@455 152 found = true;
Chris@455 153 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@455 154 found = true;
Chris@455 155 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@455 156 found = true;
Chris@455 157 }
Chris@455 158
Chris@455 159 return found;
Chris@455 160 }
Chris@455 161
Chris@455 162 bool
Chris@148 163 FileReadThread::getRequest(int token, Request &request)
Chris@148 164 {
Chris@408 165 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148 166
Chris@148 167 bool found = false;
Chris@148 168
Chris@148 169 if (m_queue.find(token) != m_queue.end()) {
Chris@148 170 request = m_queue[token];
Chris@148 171 found = true;
Chris@148 172 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 173 request = m_cancelledRequests[token];
Chris@148 174 found = true;
Chris@148 175 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 176 request = m_readyRequests[token];
Chris@148 177 found = true;
Chris@148 178 }
Chris@148 179
Chris@148 180 return found;
Chris@148 181 }
Chris@148 182
Chris@148 183 void
Chris@148 184 FileReadThread::done(int token)
Chris@148 185 {
Chris@408 186 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148 187
Chris@148 188 bool found = false;
Chris@148 189
Chris@148 190 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 191 m_cancelledRequests.erase(token);
Chris@148 192 m_newlyCancelled.erase(token);
Chris@148 193 found = true;
Chris@148 194 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 195 m_readyRequests.erase(token);
Chris@148 196 found = true;
Chris@148 197 } else if (m_queue.find(token) != m_queue.end()) {
Chris@843 198 cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
Chris@148 199 }
Chris@148 200
Chris@148 201 if (!found) {
Chris@843 202 cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
Chris@148 203 }
Chris@148 204 }
Chris@148 205
Chris@148 206 void
Chris@148 207 FileReadThread::process()
Chris@148 208 {
Chris@148 209 // entered with m_mutex locked and m_queue non-empty
Chris@148 210
Chris@408 211 Profiler profiler("FileReadThread::process", true);
Chris@148 212
Chris@148 213 int token = m_queue.begin()->first;
Chris@148 214 Request request = m_queue.begin()->second;
Chris@148 215
Chris@148 216 m_mutex.unlock();
Chris@148 217
Chris@148 218 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 219 SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
Chris@148 220 #endif
Chris@148 221
Chris@148 222 bool successful = false;
Chris@148 223 bool seekFailed = false;
Chris@148 224 ssize_t r = 0;
Chris@148 225
Chris@408 226 {
Chris@408 227 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148 228
Chris@408 229 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408 230 seekFailed = true;
Chris@408 231 } else {
Chris@148 232
Chris@408 233 // if request.size is large, we want to avoid making a single
Chris@408 234 // system call to read it all as it may block too much
Chris@408 235
Chris@408 236 static const size_t blockSize = 256 * 1024;
Chris@408 237
Chris@408 238 size_t size = request.size;
Chris@408 239 char *destination = request.data;
Chris@408 240
Chris@408 241 while (size > 0) {
Chris@408 242 size_t readSize = size;
Chris@408 243 if (readSize > blockSize) readSize = blockSize;
Chris@408 244 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408 245 if (br < 0) {
Chris@408 246 r = br;
Chris@408 247 break;
Chris@408 248 } else {
Chris@408 249 r += br;
Chris@408 250 if (br < ssize_t(readSize)) break;
Chris@408 251 }
Chris@408 252 destination += readSize;
Chris@408 253 size -= readSize;
Chris@148 254 }
Chris@148 255 }
Chris@148 256 }
Chris@148 257
Chris@148 258 if (seekFailed) {
Chris@148 259 ::perror("Seek failed");
Chris@843 260 cerr << "ERROR: FileReadThread::process: seek to "
Chris@843 261 << request.start << " failed" << endl;
Chris@148 262 request.size = 0;
Chris@148 263 } else {
Chris@148 264 if (r < 0) {
Chris@148 265 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@843 266 cerr << "ERROR: FileReadThread::process: read of "
Chris@455 267 << request.size << " at "
Chris@843 268 << request.start << " failed" << endl;
Chris@148 269 request.size = 0;
Chris@148 270 } else if (r < ssize_t(request.size)) {
Chris@843 271 cerr << "WARNING: FileReadThread::process: read "
Chris@148 272 << request.size << " returned only " << r << " bytes"
Chris@843 273 << endl;
Chris@148 274 request.size = r;
Chris@148 275 usleep(100000);
Chris@148 276 } else {
Chris@148 277 successful = true;
Chris@148 278 }
Chris@148 279 }
Chris@148 280
Chris@148 281 // Check that the token hasn't been cancelled and the thread
Chris@148 282 // hasn't been asked to finish
Chris@148 283
Chris@148 284 m_mutex.lock();
Chris@148 285
Chris@148 286 request.successful = successful;
Chris@148 287
Chris@148 288 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 289 m_queue.erase(token);
Chris@148 290 m_readyRequests[token] = request;
Chris@148 291 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 292 SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
Chris@148 293 #endif
Chris@148 294 } else {
Chris@148 295 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 296 if (m_exiting) {
Chris@690 297 SVDEBUG << "FileReadThread::process: exiting" << endl;
Chris@455 298 } else {
Chris@690 299 SVDEBUG << "FileReadThread::process: request disappeared" << endl;
Chris@455 300 }
Chris@148 301 #endif
Chris@148 302 }
Chris@148 303 }
Chris@148 304
Chris@148 305 void
Chris@148 306 FileReadThread::notifyCancelled()
Chris@148 307 {
Chris@148 308 // entered with m_mutex locked
Chris@148 309
Chris@148 310 while (!m_newlyCancelled.empty()) {
Chris@148 311
Chris@148 312 int token = *m_newlyCancelled.begin();
Chris@148 313
Chris@148 314 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 315 SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
Chris@148 316 #endif
Chris@148 317
Chris@148 318 m_newlyCancelled.erase(token);
Chris@148 319 }
Chris@148 320 }
Chris@148 321
Chris@148 322