Chris@148
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
Chris@148
|
2
|
Chris@148
|
3 /*
|
Chris@148
|
4 Sonic Visualiser
|
Chris@148
|
5 An audio file viewer and annotation editor.
|
Chris@148
|
6 Centre for Digital Music, Queen Mary, University of London.
|
Chris@148
|
7 This file copyright 2006 Chris Cannam.
|
Chris@148
|
8
|
Chris@148
|
9 This program is free software; you can redistribute it and/or
|
Chris@148
|
10 modify it under the terms of the GNU General Public License as
|
Chris@148
|
11 published by the Free Software Foundation; either version 2 of the
|
Chris@148
|
12 License, or (at your option) any later version. See the file
|
Chris@148
|
13 COPYING included with this distribution for more information.
|
Chris@148
|
14 */
|
Chris@148
|
15
|
Chris@148
|
16 #include "FileReadThread.h"
|
Chris@148
|
17
|
Chris@148
|
18 #include "base/Profiler.h"
|
Chris@408
|
19 #include "base/Thread.h"
|
Chris@148
|
20
|
Chris@148
|
21 #include <iostream>
|
Chris@1216
|
22
|
Chris@1216
|
23 #ifdef _MSC_VER
|
Chris@1216
|
24 #include <io.h>
|
Chris@1216
|
25 #define _lseek lseek
|
Chris@1216
|
26 #else
|
Chris@148
|
27 #include <unistd.h>
|
Chris@1216
|
28 #endif
|
Chris@1216
|
29
|
Chris@658
|
30 #include <cstdio>
|
Chris@148
|
31
|
Chris@455
|
32 //#define DEBUG_FILE_READ_THREAD 1
|
Chris@148
|
33
|
Chris@148
|
34 FileReadThread::FileReadThread() :
|
Chris@148
|
35 m_nextToken(0),
|
Chris@148
|
36 m_exiting(false)
|
Chris@148
|
37 {
|
Chris@148
|
38 }
|
Chris@148
|
39
|
Chris@148
|
40 void
|
Chris@148
|
41 FileReadThread::run()
|
Chris@148
|
42 {
|
Chris@408
|
43 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
|
Chris@148
|
44
|
Chris@148
|
45 while (!m_exiting) {
|
Chris@148
|
46 if (m_queue.empty()) {
|
Chris@148
|
47 m_condition.wait(&m_mutex, 1000);
|
Chris@148
|
48 } else {
|
Chris@148
|
49 process();
|
Chris@148
|
50 }
|
Chris@148
|
51 notifyCancelled();
|
Chris@148
|
52 }
|
Chris@148
|
53
|
Chris@148
|
54 notifyCancelled();
|
Chris@148
|
55
|
Chris@148
|
56 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
57 SVDEBUG << "FileReadThread::run() exiting" << endl;
|
Chris@148
|
58 #endif
|
Chris@148
|
59 }
|
Chris@148
|
60
|
Chris@148
|
61 void
|
Chris@148
|
62 FileReadThread::finish()
|
Chris@148
|
63 {
|
Chris@148
|
64 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
65 SVDEBUG << "FileReadThread::finish()" << endl;
|
Chris@148
|
66 #endif
|
Chris@148
|
67
|
Chris@408
|
68 {
|
Chris@408
|
69 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
|
Chris@408
|
70
|
Chris@408
|
71 while (!m_queue.empty()) {
|
Chris@408
|
72 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
|
Chris@408
|
73 m_newlyCancelled.insert(m_queue.begin()->first);
|
Chris@408
|
74 m_queue.erase(m_queue.begin());
|
Chris@408
|
75 }
|
Chris@408
|
76
|
Chris@408
|
77 m_exiting = true;
|
Chris@148
|
78 }
|
Chris@148
|
79
|
Chris@148
|
80 m_condition.wakeAll();
|
Chris@148
|
81
|
Chris@148
|
82 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
83 SVDEBUG << "FileReadThread::finish() exiting" << endl;
|
Chris@148
|
84 #endif
|
Chris@148
|
85 }
|
Chris@148
|
86
|
Chris@148
|
87 int
|
Chris@148
|
88 FileReadThread::request(const Request &request)
|
Chris@148
|
89 {
|
Chris@408
|
90 int token;
|
Chris@408
|
91
|
Chris@408
|
92 {
|
Chris@408
|
93 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
|
Chris@148
|
94
|
Chris@408
|
95 token = m_nextToken++;
|
Chris@408
|
96 m_queue[token] = request;
|
Chris@408
|
97 }
|
Chris@148
|
98
|
Chris@148
|
99 m_condition.wakeAll();
|
Chris@148
|
100
|
Chris@148
|
101 return token;
|
Chris@148
|
102 }
|
Chris@148
|
103
|
Chris@148
|
104 void
|
Chris@148
|
105 FileReadThread::cancel(int token)
|
Chris@148
|
106 {
|
Chris@408
|
107 {
|
Chris@408
|
108 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
|
Chris@148
|
109
|
Chris@408
|
110 if (m_queue.find(token) != m_queue.end()) {
|
Chris@408
|
111 m_cancelledRequests[token] = m_queue[token];
|
Chris@408
|
112 m_queue.erase(token);
|
Chris@408
|
113 m_newlyCancelled.insert(token);
|
Chris@408
|
114 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@408
|
115 m_cancelledRequests[token] = m_readyRequests[token];
|
Chris@408
|
116 m_readyRequests.erase(token);
|
Chris@408
|
117 } else {
|
Chris@843
|
118 cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
|
Chris@408
|
119 }
|
Chris@148
|
120 }
|
Chris@148
|
121
|
Chris@148
|
122 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
123 SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
|
Chris@148
|
124 #endif
|
Chris@148
|
125
|
Chris@148
|
126 m_condition.wakeAll();
|
Chris@148
|
127 }
|
Chris@148
|
128
|
Chris@148
|
129 bool
|
Chris@148
|
130 FileReadThread::isReady(int token)
|
Chris@148
|
131 {
|
Chris@408
|
132 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
|
Chris@148
|
133
|
Chris@148
|
134 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
|
Chris@148
|
135
|
Chris@148
|
136 return ready;
|
Chris@148
|
137 }
|
Chris@148
|
138
|
Chris@148
|
139 bool
|
Chris@148
|
140 FileReadThread::isCancelled(int token)
|
Chris@148
|
141 {
|
Chris@408
|
142 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
|
Chris@148
|
143
|
Chris@148
|
144 bool cancelled =
|
Chris@148
|
145 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
|
Chris@148
|
146 m_newlyCancelled.find(token) == m_newlyCancelled.end();
|
Chris@148
|
147
|
Chris@148
|
148 return cancelled;
|
Chris@148
|
149 }
|
Chris@148
|
150
|
Chris@148
|
151 bool
|
Chris@455
|
152 FileReadThread::haveRequest(int token)
|
Chris@455
|
153 {
|
Chris@455
|
154 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
|
Chris@455
|
155
|
Chris@455
|
156 bool found = false;
|
Chris@455
|
157
|
Chris@455
|
158 if (m_queue.find(token) != m_queue.end()) {
|
Chris@455
|
159 found = true;
|
Chris@455
|
160 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@455
|
161 found = true;
|
Chris@455
|
162 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@455
|
163 found = true;
|
Chris@455
|
164 }
|
Chris@455
|
165
|
Chris@455
|
166 return found;
|
Chris@455
|
167 }
|
Chris@455
|
168
|
Chris@455
|
169 bool
|
Chris@148
|
170 FileReadThread::getRequest(int token, Request &request)
|
Chris@148
|
171 {
|
Chris@408
|
172 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
|
Chris@148
|
173
|
Chris@148
|
174 bool found = false;
|
Chris@148
|
175
|
Chris@148
|
176 if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
177 request = m_queue[token];
|
Chris@148
|
178 found = true;
|
Chris@148
|
179 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
180 request = m_cancelledRequests[token];
|
Chris@148
|
181 found = true;
|
Chris@148
|
182 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
183 request = m_readyRequests[token];
|
Chris@148
|
184 found = true;
|
Chris@148
|
185 }
|
Chris@148
|
186
|
Chris@148
|
187 return found;
|
Chris@148
|
188 }
|
Chris@148
|
189
|
Chris@148
|
190 void
|
Chris@148
|
191 FileReadThread::done(int token)
|
Chris@148
|
192 {
|
Chris@408
|
193 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
|
Chris@148
|
194
|
Chris@148
|
195 bool found = false;
|
Chris@148
|
196
|
Chris@148
|
197 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
198 m_cancelledRequests.erase(token);
|
Chris@148
|
199 m_newlyCancelled.erase(token);
|
Chris@148
|
200 found = true;
|
Chris@148
|
201 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
202 m_readyRequests.erase(token);
|
Chris@148
|
203 found = true;
|
Chris@148
|
204 } else if (m_queue.find(token) != m_queue.end()) {
|
Chris@843
|
205 cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
|
Chris@148
|
206 }
|
Chris@148
|
207
|
Chris@148
|
208 if (!found) {
|
Chris@843
|
209 cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
|
Chris@148
|
210 }
|
Chris@148
|
211 }
|
Chris@148
|
212
|
Chris@148
|
213 void
|
Chris@148
|
214 FileReadThread::process()
|
Chris@148
|
215 {
|
Chris@148
|
216 // entered with m_mutex locked and m_queue non-empty
|
Chris@148
|
217
|
Chris@408
|
218 Profiler profiler("FileReadThread::process", true);
|
Chris@148
|
219
|
Chris@148
|
220 int token = m_queue.begin()->first;
|
Chris@148
|
221 Request request = m_queue.begin()->second;
|
Chris@148
|
222
|
Chris@148
|
223 m_mutex.unlock();
|
Chris@148
|
224
|
Chris@148
|
225 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
226 SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
|
Chris@148
|
227 #endif
|
Chris@148
|
228
|
Chris@148
|
229 bool successful = false;
|
Chris@148
|
230 bool seekFailed = false;
|
Chris@148
|
231 ssize_t r = 0;
|
Chris@148
|
232
|
Chris@408
|
233 {
|
Chris@408
|
234 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
|
Chris@148
|
235
|
Chris@408
|
236 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
|
Chris@408
|
237 seekFailed = true;
|
Chris@408
|
238 } else {
|
Chris@148
|
239
|
Chris@408
|
240 // if request.size is large, we want to avoid making a single
|
Chris@408
|
241 // system call to read it all as it may block too much
|
Chris@408
|
242
|
Chris@408
|
243 static const size_t blockSize = 256 * 1024;
|
Chris@408
|
244
|
Chris@408
|
245 size_t size = request.size;
|
Chris@408
|
246 char *destination = request.data;
|
Chris@408
|
247
|
Chris@408
|
248 while (size > 0) {
|
Chris@408
|
249 size_t readSize = size;
|
Chris@408
|
250 if (readSize > blockSize) readSize = blockSize;
|
Chris@408
|
251 ssize_t br = ::read(request.fd, destination, readSize);
|
Chris@408
|
252 if (br < 0) {
|
Chris@408
|
253 r = br;
|
Chris@408
|
254 break;
|
Chris@408
|
255 } else {
|
Chris@408
|
256 r += br;
|
Chris@408
|
257 if (br < ssize_t(readSize)) break;
|
Chris@408
|
258 }
|
Chris@408
|
259 destination += readSize;
|
Chris@408
|
260 size -= readSize;
|
Chris@148
|
261 }
|
Chris@148
|
262 }
|
Chris@148
|
263 }
|
Chris@148
|
264
|
Chris@148
|
265 if (seekFailed) {
|
Chris@148
|
266 ::perror("Seek failed");
|
Chris@843
|
267 cerr << "ERROR: FileReadThread::process: seek to "
|
Chris@843
|
268 << request.start << " failed" << endl;
|
Chris@148
|
269 request.size = 0;
|
Chris@148
|
270 } else {
|
Chris@148
|
271 if (r < 0) {
|
Chris@148
|
272 ::perror("ERROR: FileReadThread::process: Read failed");
|
Chris@843
|
273 cerr << "ERROR: FileReadThread::process: read of "
|
Chris@455
|
274 << request.size << " at "
|
Chris@843
|
275 << request.start << " failed" << endl;
|
Chris@148
|
276 request.size = 0;
|
Chris@148
|
277 } else if (r < ssize_t(request.size)) {
|
Chris@843
|
278 cerr << "WARNING: FileReadThread::process: read "
|
Chris@148
|
279 << request.size << " returned only " << r << " bytes"
|
Chris@843
|
280 << endl;
|
Chris@148
|
281 request.size = r;
|
Chris@148
|
282 usleep(100000);
|
Chris@148
|
283 } else {
|
Chris@148
|
284 successful = true;
|
Chris@148
|
285 }
|
Chris@148
|
286 }
|
Chris@148
|
287
|
Chris@148
|
288 // Check that the token hasn't been cancelled and the thread
|
Chris@148
|
289 // hasn't been asked to finish
|
Chris@148
|
290
|
Chris@148
|
291 m_mutex.lock();
|
Chris@148
|
292
|
Chris@148
|
293 request.successful = successful;
|
Chris@148
|
294
|
Chris@148
|
295 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
|
Chris@148
|
296 m_queue.erase(token);
|
Chris@148
|
297 m_readyRequests[token] = request;
|
Chris@148
|
298 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
299 SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
|
Chris@148
|
300 #endif
|
Chris@148
|
301 } else {
|
Chris@148
|
302 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@455
|
303 if (m_exiting) {
|
Chris@690
|
304 SVDEBUG << "FileReadThread::process: exiting" << endl;
|
Chris@455
|
305 } else {
|
Chris@690
|
306 SVDEBUG << "FileReadThread::process: request disappeared" << endl;
|
Chris@455
|
307 }
|
Chris@148
|
308 #endif
|
Chris@148
|
309 }
|
Chris@148
|
310 }
|
Chris@148
|
311
|
Chris@148
|
312 void
|
Chris@148
|
313 FileReadThread::notifyCancelled()
|
Chris@148
|
314 {
|
Chris@148
|
315 // entered with m_mutex locked
|
Chris@148
|
316
|
Chris@148
|
317 while (!m_newlyCancelled.empty()) {
|
Chris@148
|
318
|
Chris@148
|
319 int token = *m_newlyCancelled.begin();
|
Chris@148
|
320
|
Chris@148
|
321 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
322 SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
|
Chris@148
|
323 #endif
|
Chris@148
|
324
|
Chris@148
|
325 m_newlyCancelled.erase(token);
|
Chris@148
|
326 }
|
Chris@148
|
327 }
|
Chris@148
|
328
|
Chris@148
|
329
|