annotate data/fileio/FileReadThread.cpp @ 489:82ab61fa9223

* Reorganise our sparql queries on the basis that Redland must be available, not only optional. So for anything querying the pool of data about plugins, we use a single datastore and model which is initialised at the outset by PluginRDFIndexer and then queried directly; for anything that "reads from a file" (e.g. loading annotations) we query directly using Rasqal, going to the datastore when we need additional plugin-related information. This may improve performance, but mostly it simplifies the code and fixes a serious issue with RDF import in the previous versions (namely that multiple sequential RDF imports would end up sharing the same RDF data pool!)
author Chris Cannam
date Fri, 21 Nov 2008 16:12:29 +0000
parents 3e0f1f7bec85
children 29efe322ab47
rev   line source
Chris@148 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@148 2
Chris@148 3 /*
Chris@148 4 Sonic Visualiser
Chris@148 5 An audio file viewer and annotation editor.
Chris@148 6 Centre for Digital Music, Queen Mary, University of London.
Chris@148 7 This file copyright 2006 Chris Cannam.
Chris@148 8
Chris@148 9 This program is free software; you can redistribute it and/or
Chris@148 10 modify it under the terms of the GNU General Public License as
Chris@148 11 published by the Free Software Foundation; either version 2 of the
Chris@148 12 License, or (at your option) any later version. See the file
Chris@148 13 COPYING included with this distribution for more information.
Chris@148 14 */
Chris@148 15
Chris@148 16 #include "FileReadThread.h"
Chris@148 17
Chris@148 18 #include "base/Profiler.h"
Chris@408 19 #include "base/Thread.h"
Chris@148 20
Chris@148 21 #include <iostream>
Chris@148 22 #include <unistd.h>
Chris@148 23
Chris@455 24 //#define DEBUG_FILE_READ_THREAD 1
Chris@148 25
Chris@148 26 FileReadThread::FileReadThread() :
Chris@148 27 m_nextToken(0),
Chris@148 28 m_exiting(false)
Chris@148 29 {
Chris@148 30 }
Chris@148 31
Chris@148 32 void
Chris@148 33 FileReadThread::run()
Chris@148 34 {
Chris@408 35 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
Chris@148 36
Chris@148 37 while (!m_exiting) {
Chris@148 38 if (m_queue.empty()) {
Chris@148 39 m_condition.wait(&m_mutex, 1000);
Chris@148 40 } else {
Chris@148 41 process();
Chris@148 42 }
Chris@148 43 notifyCancelled();
Chris@148 44 }
Chris@148 45
Chris@148 46 notifyCancelled();
Chris@148 47
Chris@148 48 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 49 std::cerr << "FileReadThread::run() exiting" << std::endl;
Chris@148 50 #endif
Chris@148 51 }
Chris@148 52
Chris@148 53 void
Chris@148 54 FileReadThread::finish()
Chris@148 55 {
Chris@148 56 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 57 std::cerr << "FileReadThread::finish()" << std::endl;
Chris@148 58 #endif
Chris@148 59
Chris@408 60 {
Chris@408 61 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
Chris@408 62
Chris@408 63 while (!m_queue.empty()) {
Chris@408 64 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@408 65 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@408 66 m_queue.erase(m_queue.begin());
Chris@408 67 }
Chris@408 68
Chris@408 69 m_exiting = true;
Chris@148 70 }
Chris@148 71
Chris@148 72 m_condition.wakeAll();
Chris@148 73
Chris@148 74 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 75 std::cerr << "FileReadThread::finish() exiting" << std::endl;
Chris@148 76 #endif
Chris@148 77 }
Chris@148 78
Chris@148 79 int
Chris@148 80 FileReadThread::request(const Request &request)
Chris@148 81 {
Chris@408 82 int token;
Chris@408 83
Chris@408 84 {
Chris@408 85 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
Chris@148 86
Chris@408 87 token = m_nextToken++;
Chris@408 88 m_queue[token] = request;
Chris@408 89 }
Chris@148 90
Chris@148 91 m_condition.wakeAll();
Chris@148 92
Chris@148 93 return token;
Chris@148 94 }
Chris@148 95
Chris@148 96 void
Chris@148 97 FileReadThread::cancel(int token)
Chris@148 98 {
Chris@408 99 {
Chris@408 100 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
Chris@148 101
Chris@408 102 if (m_queue.find(token) != m_queue.end()) {
Chris@408 103 m_cancelledRequests[token] = m_queue[token];
Chris@408 104 m_queue.erase(token);
Chris@408 105 m_newlyCancelled.insert(token);
Chris@408 106 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@408 107 m_cancelledRequests[token] = m_readyRequests[token];
Chris@408 108 m_readyRequests.erase(token);
Chris@408 109 } else {
Chris@408 110 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
Chris@408 111 }
Chris@148 112 }
Chris@148 113
Chris@148 114 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 115 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
Chris@148 116 #endif
Chris@148 117
Chris@148 118 m_condition.wakeAll();
Chris@148 119 }
Chris@148 120
Chris@148 121 bool
Chris@148 122 FileReadThread::isReady(int token)
Chris@148 123 {
Chris@408 124 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
Chris@148 125
Chris@148 126 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@148 127
Chris@148 128 return ready;
Chris@148 129 }
Chris@148 130
Chris@148 131 bool
Chris@148 132 FileReadThread::isCancelled(int token)
Chris@148 133 {
Chris@408 134 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
Chris@148 135
Chris@148 136 bool cancelled =
Chris@148 137 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@148 138 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@148 139
Chris@148 140 return cancelled;
Chris@148 141 }
Chris@148 142
Chris@148 143 bool
Chris@455 144 FileReadThread::haveRequest(int token)
Chris@455 145 {
Chris@455 146 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
Chris@455 147
Chris@455 148 bool found = false;
Chris@455 149
Chris@455 150 if (m_queue.find(token) != m_queue.end()) {
Chris@455 151 found = true;
Chris@455 152 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@455 153 found = true;
Chris@455 154 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@455 155 found = true;
Chris@455 156 }
Chris@455 157
Chris@455 158 return found;
Chris@455 159 }
Chris@455 160
Chris@455 161 bool
Chris@148 162 FileReadThread::getRequest(int token, Request &request)
Chris@148 163 {
Chris@408 164 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
Chris@148 165
Chris@148 166 bool found = false;
Chris@148 167
Chris@148 168 if (m_queue.find(token) != m_queue.end()) {
Chris@148 169 request = m_queue[token];
Chris@148 170 found = true;
Chris@148 171 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 172 request = m_cancelledRequests[token];
Chris@148 173 found = true;
Chris@148 174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 175 request = m_readyRequests[token];
Chris@148 176 found = true;
Chris@148 177 }
Chris@148 178
Chris@148 179 return found;
Chris@148 180 }
Chris@148 181
Chris@148 182 void
Chris@148 183 FileReadThread::done(int token)
Chris@148 184 {
Chris@408 185 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
Chris@148 186
Chris@148 187 bool found = false;
Chris@148 188
Chris@148 189 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@148 190 m_cancelledRequests.erase(token);
Chris@148 191 m_newlyCancelled.erase(token);
Chris@148 192 found = true;
Chris@148 193 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@148 194 m_readyRequests.erase(token);
Chris@148 195 found = true;
Chris@148 196 } else if (m_queue.find(token) != m_queue.end()) {
Chris@148 197 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
Chris@148 198 }
Chris@148 199
Chris@148 200 if (!found) {
Chris@148 201 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
Chris@148 202 }
Chris@148 203 }
Chris@148 204
Chris@148 205 void
Chris@148 206 FileReadThread::process()
Chris@148 207 {
Chris@148 208 // entered with m_mutex locked and m_queue non-empty
Chris@148 209
Chris@408 210 Profiler profiler("FileReadThread::process", true);
Chris@148 211
Chris@148 212 int token = m_queue.begin()->first;
Chris@148 213 Request request = m_queue.begin()->second;
Chris@148 214
Chris@148 215 m_mutex.unlock();
Chris@148 216
Chris@148 217 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 218 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
Chris@148 219 #endif
Chris@148 220
Chris@148 221 bool successful = false;
Chris@148 222 bool seekFailed = false;
Chris@148 223 ssize_t r = 0;
Chris@148 224
Chris@408 225 {
Chris@408 226 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
Chris@148 227
Chris@408 228 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@408 229 seekFailed = true;
Chris@408 230 } else {
Chris@148 231
Chris@408 232 // if request.size is large, we want to avoid making a single
Chris@408 233 // system call to read it all as it may block too much
Chris@408 234
Chris@408 235 static const size_t blockSize = 256 * 1024;
Chris@408 236
Chris@408 237 size_t size = request.size;
Chris@408 238 char *destination = request.data;
Chris@408 239
Chris@408 240 while (size > 0) {
Chris@408 241 size_t readSize = size;
Chris@408 242 if (readSize > blockSize) readSize = blockSize;
Chris@408 243 ssize_t br = ::read(request.fd, destination, readSize);
Chris@408 244 if (br < 0) {
Chris@408 245 r = br;
Chris@408 246 break;
Chris@408 247 } else {
Chris@408 248 r += br;
Chris@408 249 if (br < ssize_t(readSize)) break;
Chris@408 250 }
Chris@408 251 destination += readSize;
Chris@408 252 size -= readSize;
Chris@148 253 }
Chris@148 254 }
Chris@148 255 }
Chris@148 256
Chris@148 257 if (seekFailed) {
Chris@148 258 ::perror("Seek failed");
Chris@148 259 std::cerr << "ERROR: FileReadThread::process: seek to "
Chris@148 260 << request.start << " failed" << std::endl;
Chris@148 261 request.size = 0;
Chris@148 262 } else {
Chris@148 263 if (r < 0) {
Chris@148 264 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@455 265 std::cerr << "ERROR: FileReadThread::process: read of "
Chris@455 266 << request.size << " at "
Chris@455 267 << request.start << " failed" << std::endl;
Chris@148 268 request.size = 0;
Chris@148 269 } else if (r < ssize_t(request.size)) {
Chris@148 270 std::cerr << "WARNING: FileReadThread::process: read "
Chris@148 271 << request.size << " returned only " << r << " bytes"
Chris@148 272 << std::endl;
Chris@148 273 request.size = r;
Chris@148 274 usleep(100000);
Chris@148 275 } else {
Chris@148 276 successful = true;
Chris@148 277 }
Chris@148 278 }
Chris@148 279
Chris@148 280 // Check that the token hasn't been cancelled and the thread
Chris@148 281 // hasn't been asked to finish
Chris@148 282
Chris@148 283 m_mutex.lock();
Chris@148 284
Chris@148 285 request.successful = successful;
Chris@148 286
Chris@148 287 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@148 288 m_queue.erase(token);
Chris@148 289 m_readyRequests[token] = request;
Chris@148 290 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 291 std::cerr << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << std::endl;
Chris@148 292 #endif
Chris@148 293 } else {
Chris@148 294 #ifdef DEBUG_FILE_READ_THREAD
Chris@455 295 if (m_exiting) {
Chris@455 296 std::cerr << "FileReadThread::process: exiting" << std::endl;
Chris@455 297 } else {
Chris@455 298 std::cerr << "FileReadThread::process: request disappeared" << std::endl;
Chris@455 299 }
Chris@148 300 #endif
Chris@148 301 }
Chris@148 302 }
Chris@148 303
Chris@148 304 void
Chris@148 305 FileReadThread::notifyCancelled()
Chris@148 306 {
Chris@148 307 // entered with m_mutex locked
Chris@148 308
Chris@148 309 while (!m_newlyCancelled.empty()) {
Chris@148 310
Chris@148 311 int token = *m_newlyCancelled.begin();
Chris@148 312
Chris@148 313 #ifdef DEBUG_FILE_READ_THREAD
Chris@148 314 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
Chris@148 315 #endif
Chris@148 316
Chris@148 317 m_newlyCancelled.erase(token);
Chris@148 318 }
Chris@148 319 }
Chris@148 320
Chris@148 321