annotate data/fileio/FileReadThread.cpp @ 1008:d9e0e59a1581

When using an aggregate model to pass data to a transform, zero-pad the shorter input to the duration of the longer rather than truncating the longer. (This is better behaviour for e.g. MATCH, and in any case the code was previously truncating incorrectly and ending up with garbage data at the end.)
author Chris Cannam
date Fri, 14 Nov 2014 13:51:33 +0000
parents e802e550a1f2
children c811991a5efa
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@408 19 #include "base/Thread.h"
Chris@148 20
Chris@148 21 #include <iostream>
Chris@148 22 #include <unistd.h>
Chris@658 23 #include <cstdio>
Chris@148 24
Chris@455 25 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 26
Chris@148 27 FileReadThread::FileReadThread() :
Chris@148 28 m_nextToken(0),
Chris@148 29 m_exiting(false)
Chris@148 30 {
Chris@148 31 }
Chris@148 32
Chris@148 33 void
Chris@148 34 FileReadThread::run()
Chris@148 35 {
Chris@408 36 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148 37
Chris@148 38 while (!m_exiting) {
Chris@148 39 if (m_queue.empty()) {
Chris@148 40 m_condition.wait(&m_mutex, 1000);
Chris@148 41 } else {
Chris@148 42 process();
Chris@148 43 }
Chris@148 44 notifyCancelled();
Chris@148 45 }
Chris@148 46
Chris@148 47 notifyCancelled();
Chris@148 48
Chris@148 49 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 50 SVDEBUG << "FileReadThread::run() exiting" << endl;
Chris@148 51 #endif
Chris@148 52 }
Chris@148 53
Chris@148 54 void
Chris@148 55 FileReadThread::finish()
Chris@148 56 {
Chris@148 57 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 58 SVDEBUG << "FileReadThread::finish()" << endl;
Chris@148 59 #endif
Chris@148 60
Chris@408 61 {
Chris@408 62 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408 63
Chris@408 64 while (!m_queue.empty()) {
Chris@408 65 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408 66 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408 67 m_queue.erase(m_queue.begin());
Chris@408 68 }
Chris@408 69
Chris@408 70 m_exiting = true;
Chris@148 71 }
Chris@148 72
Chris@148 73 m_condition.wakeAll();
Chris@148 74
Chris@148 75 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 76 SVDEBUG << "FileReadThread::finish() exiting" << endl;
Chris@148 77 #endif
Chris@148 78 }
Chris@148 79
Chris@148 80 int
Chris@148 81 FileReadThread::request(const Request &request)
Chris@148 82 {
Chris@408 83 int token;
Chris@408 84
Chris@408 85 {
Chris@408 86 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148 87
Chris@408 88 token = m_nextToken++;
Chris@408 89 m_queue[token] = request;
Chris@408 90 }
Chris@148 91
Chris@148 92 m_condition.wakeAll();
Chris@148 93
Chris@148 94 return token;
Chris@148 95 }
Chris@148 96
Chris@148 97 void
Chris@148 98 FileReadThread::cancel(int token)
Chris@148 99 {
Chris@408 100 {
Chris@408 101 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148 102
Chris@408 103 if (m_queue.find(token) != m_queue.end()) {
Chris@408 104 m_cancelledRequests[token] = m_queue[token];
Chris@408 105 m_queue.erase(token);
Chris@408 106 m_newlyCancelled.insert(token);
Chris@408 107 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408 108 m_cancelledRequests[token] = m_readyRequests[token];
Chris@408 109 m_readyRequests.erase(token);
Chris@408 110 } else {
Chris@843 111 cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
Chris@408 112 }
Chris@148 113 }
Chris@148 114
Chris@148 115 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 116 SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
Chris@148 117 #endif
Chris@148 118
Chris@148 119 m_condition.wakeAll();
Chris@148 120 }
Chris@148 121
Chris@148 122 bool
Chris@148 123 FileReadThread::isReady(int token)
Chris@148 124 {
Chris@408 125 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148 126
Chris@148 127 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 128
Chris@148 129 return ready;
Chris@148 130 }
Chris@148 131
Chris@148 132 bool
Chris@148 133 FileReadThread::isCancelled(int token)
Chris@148 134 {
Chris@408 135 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148 136
Chris@148 137 bool cancelled =
Chris@148 138 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 139 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 140
Chris@148 141 return cancelled;
Chris@148 142 }
Chris@148 143
Chris@148 144 bool
Chris@455 145 FileReadThread::haveRequest(int token)
Chris@455 146 {
Chris@455 147 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
Chris@455 148
Chris@455 149 bool found = false;
Chris@455 150
Chris@455 151 if (m_queue.find(token) != m_queue.end()) {
Chris@455 152 found = true;
Chris@455 153 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@455 154 found = true;
Chris@455 155 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@455 156 found = true;
Chris@455 157 }
Chris@455 158
Chris@455 159 return found;
Chris@455 160 }
Chris@455 161
Chris@455 162 bool
Chris@148 163 FileReadThread::getRequest(int token, Request &request)
Chris@148 164 {
Chris@408 165 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148 166
Chris@148 167 bool found = false;
Chris@148 168
Chris@148 169 if (m_queue.find(token) != m_queue.end()) {
Chris@148 170 request = m_queue[token];
Chris@148 171 found = true;
Chris@148 172 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 173 request = m_cancelledRequests[token];
Chris@148 174 found = true;
Chris@148 175 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 176 request = m_readyRequests[token];
Chris@148 177 found = true;
Chris@148 178 }
Chris@148 179
Chris@148 180 return found;
Chris@148 181 }
Chris@148 182
Chris@148 183 void
Chris@148 184 FileReadThread::done(int token)
Chris@148 185 {
Chris@408 186 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148 187
Chris@148 188 bool found = false;
Chris@148 189
Chris@148 190 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 191 m_cancelledRequests.erase(token);
Chris@148 192 m_newlyCancelled.erase(token);
Chris@148 193 found = true;
Chris@148 194 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 195 m_readyRequests.erase(token);
Chris@148 196 found = true;
Chris@148 197 } else if (m_queue.find(token) != m_queue.end()) {
Chris@843 198 cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
Chris@148 199 }
Chris@148 200
Chris@148 201 if (!found) {
Chris@843 202 cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
Chris@148 203 }
Chris@148 204 }
Chris@148 205
Chris@148 206 void
Chris@148 207 FileReadThread::process()
Chris@148 208 {
Chris@148 209 // entered with m_mutex locked and m_queue non-empty
Chris@148 210
Chris@408 211 Profiler profiler("FileReadThread::process", true);
Chris@148 212
Chris@148 213 int token = m_queue.begin()->first;
Chris@148 214 Request request = m_queue.begin()->second;
Chris@148 215
Chris@148 216 m_mutex.unlock();
Chris@148 217
Chris@148 218 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 219 SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
Chris@148 220 #endif
Chris@148 221
Chris@148 222 bool successful = false;
Chris@148 223 bool seekFailed = false;
Chris@148 224 ssize_t r = 0;
Chris@148 225
Chris@408 226 {
Chris@408 227 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148 228
Chris@408 229 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408 230 seekFailed = true;
Chris@408 231 } else {
Chris@148 232
Chris@408 233 // if request.size is large, we want to avoid making a single
Chris@408 234 // system call to read it all as it may block too much
Chris@408 235
Chris@408 236 static const size_t blockSize = 256 * 1024;
Chris@408 237
Chris@408 238 size_t size = request.size;
Chris@408 239 char *destination = request.data;
Chris@408 240
Chris@408 241 while (size > 0) {
Chris@408 242 size_t readSize = size;
Chris@408 243 if (readSize > blockSize) readSize = blockSize;
Chris@408 244 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408 245 if (br < 0) {
Chris@408 246 r = br;
Chris@408 247 break;
Chris@408 248 } else {
Chris@408 249 r += br;
Chris@408 250 if (br < ssize_t(readSize)) break;
Chris@408 251 }
Chris@408 252 destination += readSize;
Chris@408 253 size -= readSize;
Chris@148 254 }
Chris@148 255 }
Chris@148 256 }
Chris@148 257
Chris@148 258 if (seekFailed) {
Chris@148 259 ::perror("Seek failed");
Chris@843 260 cerr << "ERROR: FileReadThread::process: seek to "
Chris@843 261 << request.start << " failed" << endl;
Chris@148 262 request.size = 0;
Chris@148 263 } else {
Chris@148 264 if (r < 0) {
Chris@148 265 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@843 266 cerr << "ERROR: FileReadThread::process: read of "
Chris@455 267 << request.size << " at "
Chris@843 268 << request.start << " failed" << endl;
Chris@148 269 request.size = 0;
Chris@148 270 } else if (r < ssize_t(request.size)) {
Chris@843 271 cerr << "WARNING: FileReadThread::process: read "
Chris@148 272 << request.size << " returned only " << r << " bytes"
Chris@843 273 << endl;
Chris@148 274 request.size = r;
Chris@148 275 usleep(100000);
Chris@148 276 } else {
Chris@148 277 successful = true;
Chris@148 278 }
Chris@148 279 }
Chris@148 280
Chris@148 281 // Check that the token hasn't been cancelled and the thread
Chris@148 282 // hasn't been asked to finish
Chris@148 283
Chris@148 284 m_mutex.lock();
Chris@148 285
Chris@148 286 request.successful = successful;
Chris@148 287
Chris@148 288 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 289 m_queue.erase(token);
Chris@148 290 m_readyRequests[token] = request;
Chris@148 291 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 292 SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
Chris@148 293 #endif
Chris@148 294 } else {
Chris@148 295 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 296 if (m_exiting) {
Chris@690 297 SVDEBUG << "FileReadThread::process: exiting" << endl;
Chris@455 298 } else {
Chris@690 299 SVDEBUG << "FileReadThread::process: request disappeared" << endl;
Chris@455 300 }
Chris@148 301 #endif
Chris@148 302 }
Chris@148 303 }
Chris@148 304
Chris@148 305 void
Chris@148 306 FileReadThread::notifyCancelled()
Chris@148 307 {
Chris@148 308 // entered with m_mutex locked
Chris@148 309
Chris@148 310 while (!m_newlyCancelled.empty()) {
Chris@148 311
Chris@148 312 int token = *m_newlyCancelled.begin();
Chris@148 313
Chris@148 314 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 315 SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
Chris@148 316 #endif
Chris@148 317
Chris@148 318 m_newlyCancelled.erase(token);
Chris@148 319 }
Chris@148 320 }
Chris@148 321
Chris@148 322