annotate data/fileio/FileReadThread.cpp @ 1346:75ad55315db4 3.0-integration

More work on getting tests (especially file encoding ones) running on Windows. Various problems here to do with interaction with test filenames in Hg repos
author Chris Cannam
date Fri, 06 Jan 2017 15:44:55 +0000
parents c811991a5efa
children
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@408 19 #include "base/Thread.h"
Chris@148 20
Chris@148 21 #include <iostream>
Chris@1216 22
Chris@1216 23 #ifdef _MSC_VER
Chris@1216 24 #include <io.h>
Chris@1216 25 #define _lseek lseek
Chris@1216 26 #else
Chris@148 27 #include <unistd.h>
Chris@1216 28 #endif
Chris@1216 29
Chris@658 30 #include <cstdio>
Chris@148 31
Chris@455 32 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 33
Chris@148 34 FileReadThread::FileReadThread() :
Chris@148 35 m_nextToken(0),
Chris@148 36 m_exiting(false)
Chris@148 37 {
Chris@148 38 }
Chris@148 39
Chris@148 40 void
Chris@148 41 FileReadThread::run()
Chris@148 42 {
Chris@408 43 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148 44
Chris@148 45 while (!m_exiting) {
Chris@148 46 if (m_queue.empty()) {
Chris@148 47 m_condition.wait(&m_mutex, 1000);
Chris@148 48 } else {
Chris@148 49 process();
Chris@148 50 }
Chris@148 51 notifyCancelled();
Chris@148 52 }
Chris@148 53
Chris@148 54 notifyCancelled();
Chris@148 55
Chris@148 56 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 57 SVDEBUG << "FileReadThread::run() exiting" << endl;
Chris@148 58 #endif
Chris@148 59 }
Chris@148 60
Chris@148 61 void
Chris@148 62 FileReadThread::finish()
Chris@148 63 {
Chris@148 64 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 65 SVDEBUG << "FileReadThread::finish()" << endl;
Chris@148 66 #endif
Chris@148 67
Chris@408 68 {
Chris@408 69 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408 70
Chris@408 71 while (!m_queue.empty()) {
Chris@408 72 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408 73 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408 74 m_queue.erase(m_queue.begin());
Chris@408 75 }
Chris@408 76
Chris@408 77 m_exiting = true;
Chris@148 78 }
Chris@148 79
Chris@148 80 m_condition.wakeAll();
Chris@148 81
Chris@148 82 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 83 SVDEBUG << "FileReadThread::finish() exiting" << endl;
Chris@148 84 #endif
Chris@148 85 }
Chris@148 86
Chris@148 87 int
Chris@148 88 FileReadThread::request(const Request &request)
Chris@148 89 {
Chris@408 90 int token;
Chris@408 91
Chris@408 92 {
Chris@408 93 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148 94
Chris@408 95 token = m_nextToken++;
Chris@408 96 m_queue[token] = request;
Chris@408 97 }
Chris@148 98
Chris@148 99 m_condition.wakeAll();
Chris@148 100
Chris@148 101 return token;
Chris@148 102 }
Chris@148 103
Chris@148 104 void
Chris@148 105 FileReadThread::cancel(int token)
Chris@148 106 {
Chris@408 107 {
Chris@408 108 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148 109
Chris@408 110 if (m_queue.find(token) != m_queue.end()) {
Chris@408 111 m_cancelledRequests[token] = m_queue[token];
Chris@408 112 m_queue.erase(token);
Chris@408 113 m_newlyCancelled.insert(token);
Chris@408 114 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408 115 m_cancelledRequests[token] = m_readyRequests[token];
Chris@408 116 m_readyRequests.erase(token);
Chris@408 117 } else {
Chris@843 118 cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
Chris@408 119 }
Chris@148 120 }
Chris@148 121
Chris@148 122 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 123 SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
Chris@148 124 #endif
Chris@148 125
Chris@148 126 m_condition.wakeAll();
Chris@148 127 }
Chris@148 128
Chris@148 129 bool
Chris@148 130 FileReadThread::isReady(int token)
Chris@148 131 {
Chris@408 132 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148 133
Chris@148 134 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 135
Chris@148 136 return ready;
Chris@148 137 }
Chris@148 138
Chris@148 139 bool
Chris@148 140 FileReadThread::isCancelled(int token)
Chris@148 141 {
Chris@408 142 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148 143
Chris@148 144 bool cancelled =
Chris@148 145 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 146 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 147
Chris@148 148 return cancelled;
Chris@148 149 }
Chris@148 150
Chris@148 151 bool
Chris@455 152 FileReadThread::haveRequest(int token)
Chris@455 153 {
Chris@455 154 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
Chris@455 155
Chris@455 156 bool found = false;
Chris@455 157
Chris@455 158 if (m_queue.find(token) != m_queue.end()) {
Chris@455 159 found = true;
Chris@455 160 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@455 161 found = true;
Chris@455 162 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@455 163 found = true;
Chris@455 164 }
Chris@455 165
Chris@455 166 return found;
Chris@455 167 }
Chris@455 168
Chris@455 169 bool
Chris@148 170 FileReadThread::getRequest(int token, Request &request)
Chris@148 171 {
Chris@408 172 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148 173
Chris@148 174 bool found = false;
Chris@148 175
Chris@148 176 if (m_queue.find(token) != m_queue.end()) {
Chris@148 177 request = m_queue[token];
Chris@148 178 found = true;
Chris@148 179 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 180 request = m_cancelledRequests[token];
Chris@148 181 found = true;
Chris@148 182 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 183 request = m_readyRequests[token];
Chris@148 184 found = true;
Chris@148 185 }
Chris@148 186
Chris@148 187 return found;
Chris@148 188 }
Chris@148 189
Chris@148 190 void
Chris@148 191 FileReadThread::done(int token)
Chris@148 192 {
Chris@408 193 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148 194
Chris@148 195 bool found = false;
Chris@148 196
Chris@148 197 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 198 m_cancelledRequests.erase(token);
Chris@148 199 m_newlyCancelled.erase(token);
Chris@148 200 found = true;
Chris@148 201 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 202 m_readyRequests.erase(token);
Chris@148 203 found = true;
Chris@148 204 } else if (m_queue.find(token) != m_queue.end()) {
Chris@843 205 cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
Chris@148 206 }
Chris@148 207
Chris@148 208 if (!found) {
Chris@843 209 cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
Chris@148 210 }
Chris@148 211 }
Chris@148 212
Chris@148 213 void
Chris@148 214 FileReadThread::process()
Chris@148 215 {
Chris@148 216 // entered with m_mutex locked and m_queue non-empty
Chris@148 217
Chris@408 218 Profiler profiler("FileReadThread::process", true);
Chris@148 219
Chris@148 220 int token = m_queue.begin()->first;
Chris@148 221 Request request = m_queue.begin()->second;
Chris@148 222
Chris@148 223 m_mutex.unlock();
Chris@148 224
Chris@148 225 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 226 SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
Chris@148 227 #endif
Chris@148 228
Chris@148 229 bool successful = false;
Chris@148 230 bool seekFailed = false;
Chris@148 231 ssize_t r = 0;
Chris@148 232
Chris@408 233 {
Chris@408 234 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148 235
Chris@408 236 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408 237 seekFailed = true;
Chris@408 238 } else {
Chris@148 239
Chris@408 240 // if request.size is large, we want to avoid making a single
Chris@408 241 // system call to read it all as it may block too much
Chris@408 242
Chris@408 243 static const size_t blockSize = 256 * 1024;
Chris@408 244
Chris@408 245 size_t size = request.size;
Chris@408 246 char *destination = request.data;
Chris@408 247
Chris@408 248 while (size > 0) {
Chris@408 249 size_t readSize = size;
Chris@408 250 if (readSize > blockSize) readSize = blockSize;
Chris@408 251 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408 252 if (br < 0) {
Chris@408 253 r = br;
Chris@408 254 break;
Chris@408 255 } else {
Chris@408 256 r += br;
Chris@408 257 if (br < ssize_t(readSize)) break;
Chris@408 258 }
Chris@408 259 destination += readSize;
Chris@408 260 size -= readSize;
Chris@148 261 }
Chris@148 262 }
Chris@148 263 }
Chris@148 264
Chris@148 265 if (seekFailed) {
Chris@148 266 ::perror("Seek failed");
Chris@843 267 cerr << "ERROR: FileReadThread::process: seek to "
Chris@843 268 << request.start << " failed" << endl;
Chris@148 269 request.size = 0;
Chris@148 270 } else {
Chris@148 271 if (r < 0) {
Chris@148 272 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@843 273 cerr << "ERROR: FileReadThread::process: read of "
Chris@455 274 << request.size << " at "
Chris@843 275 << request.start << " failed" << endl;
Chris@148 276 request.size = 0;
Chris@148 277 } else if (r < ssize_t(request.size)) {
Chris@843 278 cerr << "WARNING: FileReadThread::process: read "
Chris@148 279 << request.size << " returned only " << r << " bytes"
Chris@843 280 << endl;
Chris@148 281 request.size = r;
Chris@148 282 usleep(100000);
Chris@148 283 } else {
Chris@148 284 successful = true;
Chris@148 285 }
Chris@148 286 }
Chris@148 287
Chris@148 288 // Check that the token hasn't been cancelled and the thread
Chris@148 289 // hasn't been asked to finish
Chris@148 290
Chris@148 291 m_mutex.lock();
Chris@148 292
Chris@148 293 request.successful = successful;
Chris@148 294
Chris@148 295 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 296 m_queue.erase(token);
Chris@148 297 m_readyRequests[token] = request;
Chris@148 298 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 299 SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
Chris@148 300 #endif
Chris@148 301 } else {
Chris@148 302 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 303 if (m_exiting) {
Chris@690 304 SVDEBUG << "FileReadThread::process: exiting" << endl;
Chris@455 305 } else {
Chris@690 306 SVDEBUG << "FileReadThread::process: request disappeared" << endl;
Chris@455 307 }
Chris@148 308 #endif
Chris@148 309 }
Chris@148 310 }
Chris@148 311
Chris@148 312 void
Chris@148 313 FileReadThread::notifyCancelled()
Chris@148 314 {
Chris@148 315 // entered with m_mutex locked
Chris@148 316
Chris@148 317 while (!m_newlyCancelled.empty()) {
Chris@148 318
Chris@148 319 int token = *m_newlyCancelled.begin();
Chris@148 320
Chris@148 321 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 322 SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
Chris@148 323 #endif
Chris@148 324
Chris@148 325 m_newlyCancelled.erase(token);
Chris@148 326 }
Chris@148 327 }
Chris@148 328
Chris@148 329