annotate base/FileReadThread.cpp @ 101:ce1d385f4f89

* Use kill(pid, 0) instead of /proc or sysctl blather for looking up pids * Add OpenProcess call for Win32
author Chris Cannam
date Fri, 05 May 2006 12:34:51 +0000
parents 2b1a16e38d2d
children
rev   line source
Chris@95 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
Chris@95 2
Chris@95 3 /*
Chris@95 4 Sonic Visualiser
Chris@95 5 An audio file viewer and annotation editor.
Chris@95 6 Centre for Digital Music, Queen Mary, University of London.
Chris@95 7 This file copyright 2006 Chris Cannam.
Chris@95 8
Chris@95 9 This program is free software; you can redistribute it and/or
Chris@95 10 modify it under the terms of the GNU General Public License as
Chris@95 11 published by the Free Software Foundation; either version 2 of the
Chris@95 12 License, or (at your option) any later version. See the file
Chris@95 13 COPYING included with this distribution for more information.
Chris@95 14 */
Chris@95 15
Chris@95 16 #include "FileReadThread.h"
Chris@95 17
Chris@95 18 #include <iostream>
Chris@100 19 #include <unistd.h>
Chris@95 20
Chris@95 21 FileReadThread::FileReadThread() :
Chris@95 22 m_nextToken(0),
Chris@95 23 m_exiting(false)
Chris@95 24 {
Chris@95 25 }
Chris@95 26
Chris@95 27 void
Chris@95 28 FileReadThread::run()
Chris@95 29 {
Chris@95 30 m_mutex.lock();
Chris@95 31
Chris@95 32 while (!m_exiting) {
Chris@95 33 if (m_queue.empty()) {
Chris@95 34 m_condition.wait(&m_mutex, 1000);
Chris@95 35 } else {
Chris@95 36 process();
Chris@95 37 }
Chris@96 38 notifyCancelled();
Chris@95 39 }
Chris@95 40
Chris@95 41 notifyCancelled();
Chris@95 42 m_mutex.unlock();
Chris@95 43
Chris@95 44 std::cerr << "FileReadThread::run() exiting" << std::endl;
Chris@95 45 }
Chris@95 46
Chris@95 47 void
Chris@95 48 FileReadThread::finish()
Chris@95 49 {
Chris@95 50 std::cerr << "FileReadThread::finish()" << std::endl;
Chris@95 51
Chris@95 52 m_mutex.lock();
Chris@95 53 while (!m_queue.empty()) {
Chris@95 54 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
Chris@95 55 m_newlyCancelled.insert(m_queue.begin()->first);
Chris@95 56 m_queue.erase(m_queue.begin());
Chris@95 57 }
Chris@95 58
Chris@95 59 m_exiting = true;
Chris@95 60 m_mutex.unlock();
Chris@95 61
Chris@95 62 m_condition.wakeAll();
Chris@95 63
Chris@95 64 std::cerr << "FileReadThread::finish() exiting" << std::endl;
Chris@95 65 }
Chris@95 66
Chris@95 67 int
Chris@95 68 FileReadThread::request(const Request &request)
Chris@95 69 {
Chris@95 70 m_mutex.lock();
Chris@95 71
Chris@95 72 int token = m_nextToken++;
Chris@95 73 m_queue[token] = request;
Chris@95 74
Chris@95 75 m_mutex.unlock();
Chris@95 76 m_condition.wakeAll();
Chris@95 77
Chris@95 78 return token;
Chris@95 79 }
Chris@95 80
Chris@95 81 void
Chris@95 82 FileReadThread::cancel(int token)
Chris@95 83 {
Chris@95 84 m_mutex.lock();
Chris@95 85
Chris@95 86 if (m_queue.find(token) != m_queue.end()) {
Chris@95 87 m_cancelledRequests[token] = m_queue[token];
Chris@95 88 m_queue.erase(token);
Chris@95 89 m_newlyCancelled.insert(token);
Chris@96 90 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@96 91 m_cancelledRequests[token] = m_readyRequests[token];
Chris@96 92 m_readyRequests.erase(token);
Chris@96 93 } else {
Chris@96 94 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
Chris@95 95 }
Chris@95 96
Chris@95 97 m_mutex.unlock();
Chris@96 98
Chris@96 99 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
Chris@96 100
Chris@96 101 m_condition.wakeAll();
Chris@95 102 }
Chris@95 103
Chris@95 104 bool
Chris@95 105 FileReadThread::isReady(int token)
Chris@95 106 {
Chris@95 107 m_mutex.lock();
Chris@95 108
Chris@95 109 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
Chris@95 110
Chris@95 111 m_mutex.unlock();
Chris@95 112 return ready;
Chris@95 113 }
Chris@95 114
Chris@95 115 bool
Chris@95 116 FileReadThread::isCancelled(int token)
Chris@95 117 {
Chris@95 118 m_mutex.lock();
Chris@95 119
Chris@95 120 bool cancelled =
Chris@95 121 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
Chris@95 122 m_newlyCancelled.find(token) == m_newlyCancelled.end();
Chris@95 123
Chris@95 124 m_mutex.unlock();
Chris@95 125 return cancelled;
Chris@95 126 }
Chris@95 127
Chris@95 128 bool
Chris@95 129 FileReadThread::getRequest(int token, Request &request)
Chris@95 130 {
Chris@95 131 m_mutex.lock();
Chris@95 132
Chris@95 133 bool found = false;
Chris@95 134
Chris@95 135 if (m_queue.find(token) != m_queue.end()) {
Chris@95 136 request = m_queue[token];
Chris@95 137 found = true;
Chris@95 138 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@95 139 request = m_cancelledRequests[token];
Chris@95 140 found = true;
Chris@95 141 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@95 142 request = m_readyRequests[token];
Chris@95 143 found = true;
Chris@95 144 }
Chris@95 145
Chris@95 146 m_mutex.unlock();
Chris@95 147
Chris@95 148 return found;
Chris@95 149 }
Chris@95 150
Chris@95 151 void
Chris@95 152 FileReadThread::done(int token)
Chris@95 153 {
Chris@95 154 m_mutex.lock();
Chris@95 155
Chris@95 156 bool found = false;
Chris@95 157
Chris@95 158 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
Chris@95 159 m_cancelledRequests.erase(token);
Chris@95 160 m_newlyCancelled.erase(token);
Chris@95 161 found = true;
Chris@95 162 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
Chris@95 163 m_readyRequests.erase(token);
Chris@95 164 found = true;
Chris@95 165 } else if (m_queue.find(token) != m_queue.end()) {
Chris@95 166 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
Chris@95 167 }
Chris@95 168
Chris@95 169 m_mutex.unlock();
Chris@95 170
Chris@95 171 if (!found) {
Chris@95 172 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
Chris@95 173 }
Chris@95 174 }
Chris@95 175
Chris@95 176 void
Chris@95 177 FileReadThread::process()
Chris@95 178 {
Chris@95 179 // entered with m_mutex locked and m_queue non-empty
Chris@95 180
Chris@95 181 int token = m_queue.begin()->first;
Chris@95 182 Request request = m_queue.begin()->second;
Chris@95 183
Chris@95 184 m_mutex.unlock();
Chris@95 185
Chris@95 186 std::cerr << "FileReadThread::process: got something to do" << std::endl;
Chris@95 187
Chris@95 188 bool successful = false;
Chris@95 189 bool seekFailed = false;
Chris@95 190 ssize_t r = 0;
Chris@95 191
Chris@95 192 if (request.mutex) request.mutex->lock();
Chris@95 193
Chris@95 194 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
Chris@95 195 seekFailed = true;
Chris@95 196 } else {
Chris@96 197
Chris@96 198 // if request.size is large, we want to avoid making a single
Chris@96 199 // system call to read it all as it may block too much
Chris@96 200
Chris@96 201 static const size_t blockSize = 16384;
Chris@96 202
Chris@96 203 size_t size = request.size;
Chris@96 204 char *destination = request.data;
Chris@96 205
Chris@96 206 while (size > 0) {
Chris@96 207 size_t readSize = size;
Chris@96 208 if (readSize > blockSize) readSize = blockSize;
Chris@96 209 ssize_t br = ::read(request.fd, destination, readSize);
Chris@96 210 if (br < 0) {
Chris@96 211 r = br;
Chris@96 212 break;
Chris@96 213 } else {
Chris@96 214 r += br;
Chris@96 215 if (br < ssize_t(readSize)) break;
Chris@96 216 }
Chris@96 217 destination += readSize;
Chris@96 218 size -= readSize;
Chris@96 219 }
Chris@95 220 }
Chris@95 221
Chris@95 222 if (request.mutex) request.mutex->unlock();
Chris@95 223
Chris@95 224 if (seekFailed) {
Chris@95 225 ::perror("Seek failed");
Chris@95 226 std::cerr << "ERROR: FileReadThread::process: seek to "
Chris@95 227 << request.start << " failed" << std::endl;
Chris@95 228 request.size = 0;
Chris@95 229 } else {
Chris@95 230 if (r < 0) {
Chris@95 231 ::perror("ERROR: FileReadThread::process: Read failed");
Chris@95 232 request.size = 0;
Chris@95 233 } else if (r < ssize_t(request.size)) {
Chris@95 234 std::cerr << "WARNING: FileReadThread::process: read "
Chris@95 235 << request.size << " returned only " << r << " bytes"
Chris@95 236 << std::endl;
Chris@95 237 request.size = r;
Chris@95 238 } else {
Chris@95 239 successful = true;
Chris@95 240 }
Chris@95 241 }
Chris@95 242
Chris@95 243 // Check that the token hasn't been cancelled and the thread
Chris@95 244 // hasn't been asked to finish
Chris@95 245
Chris@95 246 m_mutex.lock();
Chris@95 247
Chris@95 248 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
Chris@95 249 m_queue.erase(token);
Chris@95 250 m_readyRequests[token] = request;
Chris@95 251 m_mutex.unlock();
Chris@95 252 std::cerr << "emitting" << std::endl;
Chris@95 253 emit ready(token, successful);
Chris@95 254 m_mutex.lock();
Chris@95 255 }
Chris@95 256 }
Chris@95 257
Chris@95 258 void
Chris@95 259 FileReadThread::notifyCancelled()
Chris@95 260 {
Chris@95 261 // entered with m_mutex locked
Chris@95 262
Chris@95 263 while (!m_newlyCancelled.empty()) {
Chris@96 264
Chris@95 265 int token = *m_newlyCancelled.begin();
Chris@96 266
Chris@96 267 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
Chris@96 268
Chris@95 269 m_newlyCancelled.erase(token);
Chris@95 270 emit cancelled(token);
Chris@95 271 }
Chris@95 272 }
Chris@95 273
Chris@95 274