Chris@148
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
Chris@148
|
2
|
Chris@148
|
3 /*
|
Chris@148
|
4 Sonic Visualiser
|
Chris@148
|
5 An audio file viewer and annotation editor.
|
Chris@148
|
6 Centre for Digital Music, Queen Mary, University of London.
|
Chris@148
|
7 This file copyright 2006 Chris Cannam.
|
Chris@148
|
8
|
Chris@148
|
9 This program is free software; you can redistribute it and/or
|
Chris@148
|
10 modify it under the terms of the GNU General Public License as
|
Chris@148
|
11 published by the Free Software Foundation; either version 2 of the
|
Chris@148
|
12 License, or (at your option) any later version. See the file
|
Chris@148
|
13 COPYING included with this distribution for more information.
|
Chris@148
|
14 */
|
Chris@148
|
15
|
Chris@148
|
16 #include "FileReadThread.h"
|
Chris@148
|
17
|
Chris@148
|
18 #include "base/Profiler.h"
|
Chris@408
|
19 #include "base/Thread.h"
|
Chris@148
|
20
|
Chris@148
|
21 #include <iostream>
|
Chris@148
|
22 #include <unistd.h>
|
Chris@658
|
23 #include <cstdio>
|
Chris@148
|
24
|
Chris@455
|
25 //#define DEBUG_FILE_READ_THREAD 1
|
Chris@148
|
26
|
Chris@148
|
27 FileReadThread::FileReadThread() :
|
Chris@148
|
28 m_nextToken(0),
|
Chris@148
|
29 m_exiting(false)
|
Chris@148
|
30 {
|
Chris@148
|
31 }
|
Chris@148
|
32
|
Chris@148
|
33 void
|
Chris@148
|
34 FileReadThread::run()
|
Chris@148
|
35 {
|
Chris@408
|
36 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
|
Chris@148
|
37
|
Chris@148
|
38 while (!m_exiting) {
|
Chris@148
|
39 if (m_queue.empty()) {
|
Chris@148
|
40 m_condition.wait(&m_mutex, 1000);
|
Chris@148
|
41 } else {
|
Chris@148
|
42 process();
|
Chris@148
|
43 }
|
Chris@148
|
44 notifyCancelled();
|
Chris@148
|
45 }
|
Chris@148
|
46
|
Chris@148
|
47 notifyCancelled();
|
Chris@148
|
48
|
Chris@148
|
49 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
50 SVDEBUG << "FileReadThread::run() exiting" << endl;
|
Chris@148
|
51 #endif
|
Chris@148
|
52 }
|
Chris@148
|
53
|
Chris@148
|
54 void
|
Chris@148
|
55 FileReadThread::finish()
|
Chris@148
|
56 {
|
Chris@148
|
57 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
58 SVDEBUG << "FileReadThread::finish()" << endl;
|
Chris@148
|
59 #endif
|
Chris@148
|
60
|
Chris@408
|
61 {
|
Chris@408
|
62 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
|
Chris@408
|
63
|
Chris@408
|
64 while (!m_queue.empty()) {
|
Chris@408
|
65 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
|
Chris@408
|
66 m_newlyCancelled.insert(m_queue.begin()->first);
|
Chris@408
|
67 m_queue.erase(m_queue.begin());
|
Chris@408
|
68 }
|
Chris@408
|
69
|
Chris@408
|
70 m_exiting = true;
|
Chris@148
|
71 }
|
Chris@148
|
72
|
Chris@148
|
73 m_condition.wakeAll();
|
Chris@148
|
74
|
Chris@148
|
75 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
76 SVDEBUG << "FileReadThread::finish() exiting" << endl;
|
Chris@148
|
77 #endif
|
Chris@148
|
78 }
|
Chris@148
|
79
|
Chris@148
|
80 int
|
Chris@148
|
81 FileReadThread::request(const Request &request)
|
Chris@148
|
82 {
|
Chris@408
|
83 int token;
|
Chris@408
|
84
|
Chris@408
|
85 {
|
Chris@408
|
86 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
|
Chris@148
|
87
|
Chris@408
|
88 token = m_nextToken++;
|
Chris@408
|
89 m_queue[token] = request;
|
Chris@408
|
90 }
|
Chris@148
|
91
|
Chris@148
|
92 m_condition.wakeAll();
|
Chris@148
|
93
|
Chris@148
|
94 return token;
|
Chris@148
|
95 }
|
Chris@148
|
96
|
Chris@148
|
97 void
|
Chris@148
|
98 FileReadThread::cancel(int token)
|
Chris@148
|
99 {
|
Chris@408
|
100 {
|
Chris@408
|
101 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
|
Chris@148
|
102
|
Chris@408
|
103 if (m_queue.find(token) != m_queue.end()) {
|
Chris@408
|
104 m_cancelledRequests[token] = m_queue[token];
|
Chris@408
|
105 m_queue.erase(token);
|
Chris@408
|
106 m_newlyCancelled.insert(token);
|
Chris@408
|
107 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@408
|
108 m_cancelledRequests[token] = m_readyRequests[token];
|
Chris@408
|
109 m_readyRequests.erase(token);
|
Chris@408
|
110 } else {
|
Chris@408
|
111 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
|
Chris@408
|
112 }
|
Chris@148
|
113 }
|
Chris@148
|
114
|
Chris@148
|
115 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
116 SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
|
Chris@148
|
117 #endif
|
Chris@148
|
118
|
Chris@148
|
119 m_condition.wakeAll();
|
Chris@148
|
120 }
|
Chris@148
|
121
|
Chris@148
|
122 bool
|
Chris@148
|
123 FileReadThread::isReady(int token)
|
Chris@148
|
124 {
|
Chris@408
|
125 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
|
Chris@148
|
126
|
Chris@148
|
127 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
|
Chris@148
|
128
|
Chris@148
|
129 return ready;
|
Chris@148
|
130 }
|
Chris@148
|
131
|
Chris@148
|
132 bool
|
Chris@148
|
133 FileReadThread::isCancelled(int token)
|
Chris@148
|
134 {
|
Chris@408
|
135 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
|
Chris@148
|
136
|
Chris@148
|
137 bool cancelled =
|
Chris@148
|
138 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
|
Chris@148
|
139 m_newlyCancelled.find(token) == m_newlyCancelled.end();
|
Chris@148
|
140
|
Chris@148
|
141 return cancelled;
|
Chris@148
|
142 }
|
Chris@148
|
143
|
Chris@148
|
144 bool
|
Chris@455
|
145 FileReadThread::haveRequest(int token)
|
Chris@455
|
146 {
|
Chris@455
|
147 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
|
Chris@455
|
148
|
Chris@455
|
149 bool found = false;
|
Chris@455
|
150
|
Chris@455
|
151 if (m_queue.find(token) != m_queue.end()) {
|
Chris@455
|
152 found = true;
|
Chris@455
|
153 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@455
|
154 found = true;
|
Chris@455
|
155 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@455
|
156 found = true;
|
Chris@455
|
157 }
|
Chris@455
|
158
|
Chris@455
|
159 return found;
|
Chris@455
|
160 }
|
Chris@455
|
161
|
Chris@455
|
162 bool
|
Chris@148
|
163 FileReadThread::getRequest(int token, Request &request)
|
Chris@148
|
164 {
|
Chris@408
|
165 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
|
Chris@148
|
166
|
Chris@148
|
167 bool found = false;
|
Chris@148
|
168
|
Chris@148
|
169 if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
170 request = m_queue[token];
|
Chris@148
|
171 found = true;
|
Chris@148
|
172 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
173 request = m_cancelledRequests[token];
|
Chris@148
|
174 found = true;
|
Chris@148
|
175 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
176 request = m_readyRequests[token];
|
Chris@148
|
177 found = true;
|
Chris@148
|
178 }
|
Chris@148
|
179
|
Chris@148
|
180 return found;
|
Chris@148
|
181 }
|
Chris@148
|
182
|
Chris@148
|
183 void
|
Chris@148
|
184 FileReadThread::done(int token)
|
Chris@148
|
185 {
|
Chris@408
|
186 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
|
Chris@148
|
187
|
Chris@148
|
188 bool found = false;
|
Chris@148
|
189
|
Chris@148
|
190 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
191 m_cancelledRequests.erase(token);
|
Chris@148
|
192 m_newlyCancelled.erase(token);
|
Chris@148
|
193 found = true;
|
Chris@148
|
194 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
195 m_readyRequests.erase(token);
|
Chris@148
|
196 found = true;
|
Chris@148
|
197 } else if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
198 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
|
Chris@148
|
199 }
|
Chris@148
|
200
|
Chris@148
|
201 if (!found) {
|
Chris@148
|
202 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
|
Chris@148
|
203 }
|
Chris@148
|
204 }
|
Chris@148
|
205
|
Chris@148
|
206 void
|
Chris@148
|
207 FileReadThread::process()
|
Chris@148
|
208 {
|
Chris@148
|
209 // entered with m_mutex locked and m_queue non-empty
|
Chris@148
|
210
|
Chris@408
|
211 Profiler profiler("FileReadThread::process", true);
|
Chris@148
|
212
|
Chris@148
|
213 int token = m_queue.begin()->first;
|
Chris@148
|
214 Request request = m_queue.begin()->second;
|
Chris@148
|
215
|
Chris@148
|
216 m_mutex.unlock();
|
Chris@148
|
217
|
Chris@148
|
218 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
219 SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
|
Chris@148
|
220 #endif
|
Chris@148
|
221
|
Chris@148
|
222 bool successful = false;
|
Chris@148
|
223 bool seekFailed = false;
|
Chris@148
|
224 ssize_t r = 0;
|
Chris@148
|
225
|
Chris@408
|
226 {
|
Chris@408
|
227 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
|
Chris@148
|
228
|
Chris@408
|
229 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
|
Chris@408
|
230 seekFailed = true;
|
Chris@408
|
231 } else {
|
Chris@148
|
232
|
Chris@408
|
233 // if request.size is large, we want to avoid making a single
|
Chris@408
|
234 // system call to read it all as it may block too much
|
Chris@408
|
235
|
Chris@408
|
236 static const size_t blockSize = 256 * 1024;
|
Chris@408
|
237
|
Chris@408
|
238 size_t size = request.size;
|
Chris@408
|
239 char *destination = request.data;
|
Chris@408
|
240
|
Chris@408
|
241 while (size > 0) {
|
Chris@408
|
242 size_t readSize = size;
|
Chris@408
|
243 if (readSize > blockSize) readSize = blockSize;
|
Chris@408
|
244 ssize_t br = ::read(request.fd, destination, readSize);
|
Chris@408
|
245 if (br < 0) {
|
Chris@408
|
246 r = br;
|
Chris@408
|
247 break;
|
Chris@408
|
248 } else {
|
Chris@408
|
249 r += br;
|
Chris@408
|
250 if (br < ssize_t(readSize)) break;
|
Chris@408
|
251 }
|
Chris@408
|
252 destination += readSize;
|
Chris@408
|
253 size -= readSize;
|
Chris@148
|
254 }
|
Chris@148
|
255 }
|
Chris@148
|
256 }
|
Chris@148
|
257
|
Chris@148
|
258 if (seekFailed) {
|
Chris@148
|
259 ::perror("Seek failed");
|
Chris@148
|
260 std::cerr << "ERROR: FileReadThread::process: seek to "
|
Chris@148
|
261 << request.start << " failed" << std::endl;
|
Chris@148
|
262 request.size = 0;
|
Chris@148
|
263 } else {
|
Chris@148
|
264 if (r < 0) {
|
Chris@148
|
265 ::perror("ERROR: FileReadThread::process: Read failed");
|
Chris@455
|
266 std::cerr << "ERROR: FileReadThread::process: read of "
|
Chris@455
|
267 << request.size << " at "
|
Chris@455
|
268 << request.start << " failed" << std::endl;
|
Chris@148
|
269 request.size = 0;
|
Chris@148
|
270 } else if (r < ssize_t(request.size)) {
|
Chris@148
|
271 std::cerr << "WARNING: FileReadThread::process: read "
|
Chris@148
|
272 << request.size << " returned only " << r << " bytes"
|
Chris@148
|
273 << std::endl;
|
Chris@148
|
274 request.size = r;
|
Chris@148
|
275 usleep(100000);
|
Chris@148
|
276 } else {
|
Chris@148
|
277 successful = true;
|
Chris@148
|
278 }
|
Chris@148
|
279 }
|
Chris@148
|
280
|
Chris@148
|
281 // Check that the token hasn't been cancelled and the thread
|
Chris@148
|
282 // hasn't been asked to finish
|
Chris@148
|
283
|
Chris@148
|
284 m_mutex.lock();
|
Chris@148
|
285
|
Chris@148
|
286 request.successful = successful;
|
Chris@148
|
287
|
Chris@148
|
288 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
|
Chris@148
|
289 m_queue.erase(token);
|
Chris@148
|
290 m_readyRequests[token] = request;
|
Chris@148
|
291 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
292 SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
|
Chris@148
|
293 #endif
|
Chris@148
|
294 } else {
|
Chris@148
|
295 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@455
|
296 if (m_exiting) {
|
Chris@690
|
297 SVDEBUG << "FileReadThread::process: exiting" << endl;
|
Chris@455
|
298 } else {
|
Chris@690
|
299 SVDEBUG << "FileReadThread::process: request disappeared" << endl;
|
Chris@455
|
300 }
|
Chris@148
|
301 #endif
|
Chris@148
|
302 }
|
Chris@148
|
303 }
|
Chris@148
|
304
|
Chris@148
|
305 void
|
Chris@148
|
306 FileReadThread::notifyCancelled()
|
Chris@148
|
307 {
|
Chris@148
|
308 // entered with m_mutex locked
|
Chris@148
|
309
|
Chris@148
|
310 while (!m_newlyCancelled.empty()) {
|
Chris@148
|
311
|
Chris@148
|
312 int token = *m_newlyCancelled.begin();
|
Chris@148
|
313
|
Chris@148
|
314 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@690
|
315 SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
|
Chris@148
|
316 #endif
|
Chris@148
|
317
|
Chris@148
|
318 m_newlyCancelled.erase(token);
|
Chris@148
|
319 }
|
Chris@148
|
320 }
|
Chris@148
|
321
|
Chris@148
|
322
|