annotate data/fileio/FileReadThread.cpp @ 1434:0684c6698e3f streaming-csv-writer

Added utility function for splitting a model selection into chunks and writing to a stream.
author Lucas Thompson <dev@lucas.im>
date Tue, 17 Apr 2018 10:03:49 +0100 (2018-04-17)
parents c811991a5efa
children
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@408 19 #include "base/Thread.h"
Chris@148 20
Chris@148 21 #include <iostream>
Chris@1216 22
Chris@1216 23 #ifdef _MSC_VER
Chris@1216 24 #include <io.h>
Chris@1216 25 #define _lseek lseek
Chris@1216 26 #else
Chris@148 27 #include <unistd.h>
Chris@1216 28 #endif
Chris@1216 29
Chris@658 30 #include <cstdio>
Chris@148 31
Chris@455 32 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 33
Chris@148 34 FileReadThread::FileReadThread() :
Chris@148 35 m_nextToken(0),
Chris@148 36 m_exiting(false)
Chris@148 37 {
Chris@148 38 }
Chris@148 39
Chris@148 40 void
Chris@148 41 FileReadThread::run()
Chris@148 42 {
Chris@408 43 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148 44
Chris@148 45 while (!m_exiting) {
Chris@148 46 if (m_queue.empty()) {
Chris@148 47 m_condition.wait(&m_mutex, 1000);
Chris@148 48 } else {
Chris@148 49 process();
Chris@148 50 }
Chris@148 51 notifyCancelled();
Chris@148 52 }
Chris@148 53
Chris@148 54 notifyCancelled();
Chris@148 55
Chris@148 56 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 57 SVDEBUG << "FileReadThread::run() exiting" << endl;
Chris@148 58 #endif
Chris@148 59 }
Chris@148 60
Chris@148 61 void
Chris@148 62 FileReadThread::finish()
Chris@148 63 {
Chris@148 64 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 65 SVDEBUG << "FileReadThread::finish()" << endl;
Chris@148 66 #endif
Chris@148 67
Chris@408 68 {
Chris@408 69 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408 70
Chris@408 71 while (!m_queue.empty()) {
Chris@408 72 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408 73 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408 74 m_queue.erase(m_queue.begin());
Chris@408 75 }
Chris@408 76
Chris@408 77 m_exiting = true;
Chris@148 78 }
Chris@148 79
Chris@148 80 m_condition.wakeAll();
Chris@148 81
Chris@148 82 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 83 SVDEBUG << "FileReadThread::finish() exiting" << endl;
Chris@148 84 #endif
Chris@148 85 }
Chris@148 86
Chris@148 87 int
Chris@148 88 FileReadThread::request(const Request &request)
Chris@148 89 {
Chris@408 90 int token;
Chris@408 91
Chris@408 92 {
Chris@408 93 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148 94
Chris@408 95 token = m_nextToken++;
Chris@408 96 m_queue[token] = request;
Chris@408 97 }
Chris@148 98
Chris@148 99 m_condition.wakeAll();
Chris@148 100
Chris@148 101 return token;
Chris@148 102 }
Chris@148 103
Chris@148 104 void
Chris@148 105 FileReadThread::cancel(int token)
Chris@148 106 {
Chris@408 107 {
Chris@408 108 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148 109
Chris@408 110 if (m_queue.find(token) != m_queue.end()) {
Chris@408 111 m_cancelledRequests[token] = m_queue[token];
Chris@408 112 m_queue.erase(token);
Chris@408 113 m_newlyCancelled.insert(token);
Chris@408 114 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408 115 m_cancelledRequests[token] = m_readyRequests[token];
Chris@408 116 m_readyRequests.erase(token);
Chris@408 117 } else {
Chris@843 118 cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
Chris@408 119 }
Chris@148 120 }
Chris@148 121
Chris@148 122 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 123 SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
Chris@148 124 #endif
Chris@148 125
Chris@148 126 m_condition.wakeAll();
Chris@148 127 }
Chris@148 128
Chris@148 129 bool
Chris@148 130 FileReadThread::isReady(int token)
Chris@148 131 {
Chris@408 132 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148 133
Chris@148 134 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 135
Chris@148 136 return ready;
Chris@148 137 }
Chris@148 138
Chris@148 139 bool
Chris@148 140 FileReadThread::isCancelled(int token)
Chris@148 141 {
Chris@408 142 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148 143
Chris@148 144 bool cancelled =
Chris@148 145 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 146 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 147
Chris@148 148 return cancelled;
Chris@148 149 }
Chris@148 150
Chris@148 151 bool
Chris@455 152 FileReadThread::haveRequest(int token)
Chris@455 153 {
Chris@455 154 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
Chris@455 155
Chris@455 156 bool found = false;
Chris@455 157
Chris@455 158 if (m_queue.find(token) != m_queue.end()) {
Chris@455 159 found = true;
Chris@455 160 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@455 161 found = true;
Chris@455 162 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@455 163 found = true;
Chris@455 164 }
Chris@455 165
Chris@455 166 return found;
Chris@455 167 }
Chris@455 168
Chris@455 169 bool
Chris@148 170 FileReadThread::getRequest(int token, Request &request)
Chris@148 171 {
Chris@408 172 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148 173
Chris@148 174 bool found = false;
Chris@148 175
Chris@148 176 if (m_queue.find(token) != m_queue.end()) {
Chris@148 177 request = m_queue[token];
Chris@148 178 found = true;
Chris@148 179 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 180 request = m_cancelledRequests[token];
Chris@148 181 found = true;
Chris@148 182 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 183 request = m_readyRequests[token];
Chris@148 184 found = true;
Chris@148 185 }
Chris@148 186
Chris@148 187 return found;
Chris@148 188 }
Chris@148 189
Chris@148 190 void
Chris@148 191 FileReadThread::done(int token)
Chris@148 192 {
Chris@408 193 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148 194
Chris@148 195 bool found = false;
Chris@148 196
Chris@148 197 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 198 m_cancelledRequests.erase(token);
Chris@148 199 m_newlyCancelled.erase(token);
Chris@148 200 found = true;
Chris@148 201 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 202 m_readyRequests.erase(token);
Chris@148 203 found = true;
Chris@148 204 } else if (m_queue.find(token) != m_queue.end()) {
Chris@843 205 cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
Chris@148 206 }
Chris@148 207
Chris@148 208 if (!found) {
Chris@843 209 cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
Chris@148 210 }
Chris@148 211 }
Chris@148 212
Chris@148 213 void
Chris@148 214 FileReadThread::process()
Chris@148 215 {
Chris@148 216 // entered with m_mutex locked and m_queue non-empty
Chris@148 217
Chris@408 218 Profiler profiler("FileReadThread::process", true);
Chris@148 219
Chris@148 220 int token = m_queue.begin()->first;
Chris@148 221 Request request = m_queue.begin()->second;
Chris@148 222
Chris@148 223 m_mutex.unlock();
Chris@148 224
Chris@148 225 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 226 SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
Chris@148 227 #endif
Chris@148 228
Chris@148 229 bool successful = false;
Chris@148 230 bool seekFailed = false;
Chris@148 231 ssize_t r = 0;
Chris@148 232
Chris@408 233 {
Chris@408 234 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148 235
Chris@408 236 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408 237 seekFailed = true;
Chris@408 238 } else {
Chris@148 239
Chris@408 240 // if request.size is large, we want to avoid making a single
Chris@408 241 // system call to read it all as it may block too much
Chris@408 242
Chris@408 243 static const size_t blockSize = 256 * 1024;
Chris@408 244
Chris@408 245 size_t size = request.size;
Chris@408 246 char *destination = request.data;
Chris@408 247
Chris@408 248 while (size > 0) {
Chris@408 249 size_t readSize = size;
Chris@408 250 if (readSize > blockSize) readSize = blockSize;
Chris@408 251 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408 252 if (br < 0) {
Chris@408 253 r = br;
Chris@408 254 break;
Chris@408 255 } else {
Chris@408 256 r += br;
Chris@408 257 if (br < ssize_t(readSize)) break;
Chris@408 258 }
Chris@408 259 destination += readSize;
Chris@408 260 size -= readSize;
Chris@148 261 }
Chris@148 262 }
Chris@148 263 }
Chris@148 264
Chris@148 265 if (seekFailed) {
Chris@148 266 ::perror("Seek failed");
Chris@843 267 cerr << "ERROR: FileReadThread::process: seek to "
Chris@843 268 << request.start << " failed" << endl;
Chris@148 269 request.size = 0;
Chris@148 270 } else {
Chris@148 271 if (r < 0) {
Chris@148 272 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@843 273 cerr << "ERROR: FileReadThread::process: read of "
Chris@455 274 << request.size << " at "
Chris@843 275 << request.start << " failed" << endl;
Chris@148 276 request.size = 0;
Chris@148 277 } else if (r < ssize_t(request.size)) {
Chris@843 278 cerr << "WARNING: FileReadThread::process: read "
Chris@148 279 << request.size << " returned only " << r << " bytes"
Chris@843 280 << endl;
Chris@148 281 request.size = r;
Chris@148 282 usleep(100000);
Chris@148 283 } else {
Chris@148 284 successful = true;
Chris@148 285 }
Chris@148 286 }
Chris@148 287
Chris@148 288 // Check that the token hasn't been cancelled and the thread
Chris@148 289 // hasn't been asked to finish
Chris@148 290
Chris@148 291 m_mutex.lock();
Chris@148 292
Chris@148 293 request.successful = successful;
Chris@148 294
Chris@148 295 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 296 m_queue.erase(token);
Chris@148 297 m_readyRequests[token] = request;
Chris@148 298 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 299 SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
Chris@148 300 #endif
Chris@148 301 } else {
Chris@148 302 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 303 if (m_exiting) {
Chris@690 304 SVDEBUG << "FileReadThread::process: exiting" << endl;
Chris@455 305 } else {
Chris@690 306 SVDEBUG << "FileReadThread::process: request disappeared" << endl;
Chris@455 307 }
Chris@148 308 #endif
Chris@148 309 }
Chris@148 310 }
Chris@148 311
Chris@148 312 void
Chris@148 313 FileReadThread::notifyCancelled()
Chris@148 314 {
Chris@148 315 // entered with m_mutex locked
Chris@148 316
Chris@148 317 while (!m_newlyCancelled.empty()) {
Chris@148 318
Chris@148 319 int token = *m_newlyCancelled.begin();
Chris@148 320
Chris@148 321 #ifdef DEBUG_FILE_READ_THREAD
Chris@690 322 SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
Chris@148 323 #endif
Chris@148 324
Chris@148 325 m_newlyCancelled.erase(token);
Chris@148 326 }
Chris@148 327 }
Chris@148 328
Chris@148 329