Chris@148
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
Chris@148
|
2
|
Chris@148
|
3 /*
|
Chris@148
|
4 Sonic Visualiser
|
Chris@148
|
5 An audio file viewer and annotation editor.
|
Chris@148
|
6 Centre for Digital Music, Queen Mary, University of London.
|
Chris@148
|
7 This file copyright 2006 Chris Cannam.
|
Chris@148
|
8
|
Chris@148
|
9 This program is free software; you can redistribute it and/or
|
Chris@148
|
10 modify it under the terms of the GNU General Public License as
|
Chris@148
|
11 published by the Free Software Foundation; either version 2 of the
|
Chris@148
|
12 License, or (at your option) any later version. See the file
|
Chris@148
|
13 COPYING included with this distribution for more information.
|
Chris@148
|
14 */
|
Chris@148
|
15
|
Chris@148
|
16 #include "FileReadThread.h"
|
Chris@148
|
17
|
Chris@148
|
18 #include "base/Profiler.h"
|
Chris@408
|
19 #include "base/Thread.h"
|
Chris@148
|
20
|
Chris@148
|
21 #include <iostream>
|
Chris@148
|
22 #include <unistd.h>
|
Chris@148
|
23
|
Chris@455
|
24 //#define DEBUG_FILE_READ_THREAD 1
|
Chris@148
|
25
|
Chris@148
|
26 FileReadThread::FileReadThread() :
|
Chris@148
|
27 m_nextToken(0),
|
Chris@148
|
28 m_exiting(false)
|
Chris@148
|
29 {
|
Chris@148
|
30 }
|
Chris@148
|
31
|
Chris@148
|
32 void
|
Chris@148
|
33 FileReadThread::run()
|
Chris@148
|
34 {
|
Chris@408
|
35 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
|
Chris@148
|
36
|
Chris@148
|
37 while (!m_exiting) {
|
Chris@148
|
38 if (m_queue.empty()) {
|
Chris@148
|
39 m_condition.wait(&m_mutex, 1000);
|
Chris@148
|
40 } else {
|
Chris@148
|
41 process();
|
Chris@148
|
42 }
|
Chris@148
|
43 notifyCancelled();
|
Chris@148
|
44 }
|
Chris@148
|
45
|
Chris@148
|
46 notifyCancelled();
|
Chris@148
|
47
|
Chris@148
|
48 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
49 std::cerr << "FileReadThread::run() exiting" << std::endl;
|
Chris@148
|
50 #endif
|
Chris@148
|
51 }
|
Chris@148
|
52
|
Chris@148
|
53 void
|
Chris@148
|
54 FileReadThread::finish()
|
Chris@148
|
55 {
|
Chris@148
|
56 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
57 std::cerr << "FileReadThread::finish()" << std::endl;
|
Chris@148
|
58 #endif
|
Chris@148
|
59
|
Chris@408
|
60 {
|
Chris@408
|
61 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
|
Chris@408
|
62
|
Chris@408
|
63 while (!m_queue.empty()) {
|
Chris@408
|
64 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
|
Chris@408
|
65 m_newlyCancelled.insert(m_queue.begin()->first);
|
Chris@408
|
66 m_queue.erase(m_queue.begin());
|
Chris@408
|
67 }
|
Chris@408
|
68
|
Chris@408
|
69 m_exiting = true;
|
Chris@148
|
70 }
|
Chris@148
|
71
|
Chris@148
|
72 m_condition.wakeAll();
|
Chris@148
|
73
|
Chris@148
|
74 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
75 std::cerr << "FileReadThread::finish() exiting" << std::endl;
|
Chris@148
|
76 #endif
|
Chris@148
|
77 }
|
Chris@148
|
78
|
Chris@148
|
79 int
|
Chris@148
|
80 FileReadThread::request(const Request &request)
|
Chris@148
|
81 {
|
Chris@408
|
82 int token;
|
Chris@408
|
83
|
Chris@408
|
84 {
|
Chris@408
|
85 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
|
Chris@148
|
86
|
Chris@408
|
87 token = m_nextToken++;
|
Chris@408
|
88 m_queue[token] = request;
|
Chris@408
|
89 }
|
Chris@148
|
90
|
Chris@148
|
91 m_condition.wakeAll();
|
Chris@148
|
92
|
Chris@148
|
93 return token;
|
Chris@148
|
94 }
|
Chris@148
|
95
|
Chris@148
|
96 void
|
Chris@148
|
97 FileReadThread::cancel(int token)
|
Chris@148
|
98 {
|
Chris@408
|
99 {
|
Chris@408
|
100 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
|
Chris@148
|
101
|
Chris@408
|
102 if (m_queue.find(token) != m_queue.end()) {
|
Chris@408
|
103 m_cancelledRequests[token] = m_queue[token];
|
Chris@408
|
104 m_queue.erase(token);
|
Chris@408
|
105 m_newlyCancelled.insert(token);
|
Chris@408
|
106 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@408
|
107 m_cancelledRequests[token] = m_readyRequests[token];
|
Chris@408
|
108 m_readyRequests.erase(token);
|
Chris@408
|
109 } else {
|
Chris@408
|
110 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
|
Chris@408
|
111 }
|
Chris@148
|
112 }
|
Chris@148
|
113
|
Chris@148
|
114 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
115 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
|
Chris@148
|
116 #endif
|
Chris@148
|
117
|
Chris@148
|
118 m_condition.wakeAll();
|
Chris@148
|
119 }
|
Chris@148
|
120
|
Chris@148
|
121 bool
|
Chris@148
|
122 FileReadThread::isReady(int token)
|
Chris@148
|
123 {
|
Chris@408
|
124 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
|
Chris@148
|
125
|
Chris@148
|
126 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
|
Chris@148
|
127
|
Chris@148
|
128 return ready;
|
Chris@148
|
129 }
|
Chris@148
|
130
|
Chris@148
|
131 bool
|
Chris@148
|
132 FileReadThread::isCancelled(int token)
|
Chris@148
|
133 {
|
Chris@408
|
134 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
|
Chris@148
|
135
|
Chris@148
|
136 bool cancelled =
|
Chris@148
|
137 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
|
Chris@148
|
138 m_newlyCancelled.find(token) == m_newlyCancelled.end();
|
Chris@148
|
139
|
Chris@148
|
140 return cancelled;
|
Chris@148
|
141 }
|
Chris@148
|
142
|
Chris@148
|
143 bool
|
Chris@455
|
144 FileReadThread::haveRequest(int token)
|
Chris@455
|
145 {
|
Chris@455
|
146 MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
|
Chris@455
|
147
|
Chris@455
|
148 bool found = false;
|
Chris@455
|
149
|
Chris@455
|
150 if (m_queue.find(token) != m_queue.end()) {
|
Chris@455
|
151 found = true;
|
Chris@455
|
152 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@455
|
153 found = true;
|
Chris@455
|
154 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@455
|
155 found = true;
|
Chris@455
|
156 }
|
Chris@455
|
157
|
Chris@455
|
158 return found;
|
Chris@455
|
159 }
|
Chris@455
|
160
|
Chris@455
|
161 bool
|
Chris@148
|
162 FileReadThread::getRequest(int token, Request &request)
|
Chris@148
|
163 {
|
Chris@408
|
164 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
|
Chris@148
|
165
|
Chris@148
|
166 bool found = false;
|
Chris@148
|
167
|
Chris@148
|
168 if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
169 request = m_queue[token];
|
Chris@148
|
170 found = true;
|
Chris@148
|
171 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
172 request = m_cancelledRequests[token];
|
Chris@148
|
173 found = true;
|
Chris@148
|
174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
175 request = m_readyRequests[token];
|
Chris@148
|
176 found = true;
|
Chris@148
|
177 }
|
Chris@148
|
178
|
Chris@148
|
179 return found;
|
Chris@148
|
180 }
|
Chris@148
|
181
|
Chris@148
|
182 void
|
Chris@148
|
183 FileReadThread::done(int token)
|
Chris@148
|
184 {
|
Chris@408
|
185 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
|
Chris@148
|
186
|
Chris@148
|
187 bool found = false;
|
Chris@148
|
188
|
Chris@148
|
189 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
190 m_cancelledRequests.erase(token);
|
Chris@148
|
191 m_newlyCancelled.erase(token);
|
Chris@148
|
192 found = true;
|
Chris@148
|
193 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
194 m_readyRequests.erase(token);
|
Chris@148
|
195 found = true;
|
Chris@148
|
196 } else if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
197 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
|
Chris@148
|
198 }
|
Chris@148
|
199
|
Chris@148
|
200 if (!found) {
|
Chris@148
|
201 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
|
Chris@148
|
202 }
|
Chris@148
|
203 }
|
Chris@148
|
204
|
Chris@148
|
205 void
|
Chris@148
|
206 FileReadThread::process()
|
Chris@148
|
207 {
|
Chris@148
|
208 // entered with m_mutex locked and m_queue non-empty
|
Chris@148
|
209
|
Chris@408
|
210 Profiler profiler("FileReadThread::process", true);
|
Chris@148
|
211
|
Chris@148
|
212 int token = m_queue.begin()->first;
|
Chris@148
|
213 Request request = m_queue.begin()->second;
|
Chris@148
|
214
|
Chris@148
|
215 m_mutex.unlock();
|
Chris@148
|
216
|
Chris@148
|
217 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
218 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
|
Chris@148
|
219 #endif
|
Chris@148
|
220
|
Chris@148
|
221 bool successful = false;
|
Chris@148
|
222 bool seekFailed = false;
|
Chris@148
|
223 ssize_t r = 0;
|
Chris@148
|
224
|
Chris@408
|
225 {
|
Chris@408
|
226 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
|
Chris@148
|
227
|
Chris@408
|
228 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
|
Chris@408
|
229 seekFailed = true;
|
Chris@408
|
230 } else {
|
Chris@148
|
231
|
Chris@408
|
232 // if request.size is large, we want to avoid making a single
|
Chris@408
|
233 // system call to read it all as it may block too much
|
Chris@408
|
234
|
Chris@408
|
235 static const size_t blockSize = 256 * 1024;
|
Chris@408
|
236
|
Chris@408
|
237 size_t size = request.size;
|
Chris@408
|
238 char *destination = request.data;
|
Chris@408
|
239
|
Chris@408
|
240 while (size > 0) {
|
Chris@408
|
241 size_t readSize = size;
|
Chris@408
|
242 if (readSize > blockSize) readSize = blockSize;
|
Chris@408
|
243 ssize_t br = ::read(request.fd, destination, readSize);
|
Chris@408
|
244 if (br < 0) {
|
Chris@408
|
245 r = br;
|
Chris@408
|
246 break;
|
Chris@408
|
247 } else {
|
Chris@408
|
248 r += br;
|
Chris@408
|
249 if (br < ssize_t(readSize)) break;
|
Chris@408
|
250 }
|
Chris@408
|
251 destination += readSize;
|
Chris@408
|
252 size -= readSize;
|
Chris@148
|
253 }
|
Chris@148
|
254 }
|
Chris@148
|
255 }
|
Chris@148
|
256
|
Chris@148
|
257 if (seekFailed) {
|
Chris@148
|
258 ::perror("Seek failed");
|
Chris@148
|
259 std::cerr << "ERROR: FileReadThread::process: seek to "
|
Chris@148
|
260 << request.start << " failed" << std::endl;
|
Chris@148
|
261 request.size = 0;
|
Chris@148
|
262 } else {
|
Chris@148
|
263 if (r < 0) {
|
Chris@148
|
264 ::perror("ERROR: FileReadThread::process: Read failed");
|
Chris@455
|
265 std::cerr << "ERROR: FileReadThread::process: read of "
|
Chris@455
|
266 << request.size << " at "
|
Chris@455
|
267 << request.start << " failed" << std::endl;
|
Chris@148
|
268 request.size = 0;
|
Chris@148
|
269 } else if (r < ssize_t(request.size)) {
|
Chris@148
|
270 std::cerr << "WARNING: FileReadThread::process: read "
|
Chris@148
|
271 << request.size << " returned only " << r << " bytes"
|
Chris@148
|
272 << std::endl;
|
Chris@148
|
273 request.size = r;
|
Chris@148
|
274 usleep(100000);
|
Chris@148
|
275 } else {
|
Chris@148
|
276 successful = true;
|
Chris@148
|
277 }
|
Chris@148
|
278 }
|
Chris@148
|
279
|
Chris@148
|
280 // Check that the token hasn't been cancelled and the thread
|
Chris@148
|
281 // hasn't been asked to finish
|
Chris@148
|
282
|
Chris@148
|
283 m_mutex.lock();
|
Chris@148
|
284
|
Chris@148
|
285 request.successful = successful;
|
Chris@148
|
286
|
Chris@148
|
287 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
|
Chris@148
|
288 m_queue.erase(token);
|
Chris@148
|
289 m_readyRequests[token] = request;
|
Chris@148
|
290 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@455
|
291 std::cerr << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << std::endl;
|
Chris@148
|
292 #endif
|
Chris@148
|
293 } else {
|
Chris@148
|
294 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@455
|
295 if (m_exiting) {
|
Chris@455
|
296 std::cerr << "FileReadThread::process: exiting" << std::endl;
|
Chris@455
|
297 } else {
|
Chris@455
|
298 std::cerr << "FileReadThread::process: request disappeared" << std::endl;
|
Chris@455
|
299 }
|
Chris@148
|
300 #endif
|
Chris@148
|
301 }
|
Chris@148
|
302 }
|
Chris@148
|
303
|
Chris@148
|
304 void
|
Chris@148
|
305 FileReadThread::notifyCancelled()
|
Chris@148
|
306 {
|
Chris@148
|
307 // entered with m_mutex locked
|
Chris@148
|
308
|
Chris@148
|
309 while (!m_newlyCancelled.empty()) {
|
Chris@148
|
310
|
Chris@148
|
311 int token = *m_newlyCancelled.begin();
|
Chris@148
|
312
|
Chris@148
|
313 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
314 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
|
Chris@148
|
315 #endif
|
Chris@148
|
316
|
Chris@148
|
317 m_newlyCancelled.erase(token);
|
Chris@148
|
318 }
|
Chris@148
|
319 }
|
Chris@148
|
320
|
Chris@148
|
321
|