Chris@148
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
Chris@148
|
2
|
Chris@148
|
3 /*
|
Chris@148
|
4 Sonic Visualiser
|
Chris@148
|
5 An audio file viewer and annotation editor.
|
Chris@148
|
6 Centre for Digital Music, Queen Mary, University of London.
|
Chris@148
|
7 This file copyright 2006 Chris Cannam.
|
Chris@148
|
8
|
Chris@148
|
9 This program is free software; you can redistribute it and/or
|
Chris@148
|
10 modify it under the terms of the GNU General Public License as
|
Chris@148
|
11 published by the Free Software Foundation; either version 2 of the
|
Chris@148
|
12 License, or (at your option) any later version. See the file
|
Chris@148
|
13 COPYING included with this distribution for more information.
|
Chris@148
|
14 */
|
Chris@148
|
15
|
Chris@148
|
16 #include "FileReadThread.h"
|
Chris@148
|
17
|
Chris@148
|
18 #include "base/Profiler.h"
|
Chris@148
|
19
|
Chris@148
|
20 #include <iostream>
|
Chris@148
|
21 #include <unistd.h>
|
Chris@148
|
22
|
Chris@148
|
23 //#define DEBUG_FILE_READ_THREAD 1
|
Chris@148
|
24
|
Chris@148
|
25 FileReadThread::FileReadThread() :
|
Chris@148
|
26 m_nextToken(0),
|
Chris@148
|
27 m_exiting(false)
|
Chris@148
|
28 {
|
Chris@148
|
29 }
|
Chris@148
|
30
|
Chris@148
|
31 void
|
Chris@148
|
32 FileReadThread::run()
|
Chris@148
|
33 {
|
Chris@148
|
34 m_mutex.lock();
|
Chris@148
|
35
|
Chris@148
|
36 while (!m_exiting) {
|
Chris@148
|
37 if (m_queue.empty()) {
|
Chris@148
|
38 m_condition.wait(&m_mutex, 1000);
|
Chris@148
|
39 } else {
|
Chris@148
|
40 process();
|
Chris@148
|
41 }
|
Chris@148
|
42 notifyCancelled();
|
Chris@148
|
43 }
|
Chris@148
|
44
|
Chris@148
|
45 notifyCancelled();
|
Chris@148
|
46 m_mutex.unlock();
|
Chris@148
|
47
|
Chris@148
|
48 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
49 std::cerr << "FileReadThread::run() exiting" << std::endl;
|
Chris@148
|
50 #endif
|
Chris@148
|
51 }
|
Chris@148
|
52
|
Chris@148
|
53 void
|
Chris@148
|
54 FileReadThread::finish()
|
Chris@148
|
55 {
|
Chris@148
|
56 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
57 std::cerr << "FileReadThread::finish()" << std::endl;
|
Chris@148
|
58 #endif
|
Chris@148
|
59
|
Chris@148
|
60 m_mutex.lock();
|
Chris@148
|
61 while (!m_queue.empty()) {
|
Chris@148
|
62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
|
Chris@148
|
63 m_newlyCancelled.insert(m_queue.begin()->first);
|
Chris@148
|
64 m_queue.erase(m_queue.begin());
|
Chris@148
|
65 }
|
Chris@148
|
66
|
Chris@148
|
67 m_exiting = true;
|
Chris@148
|
68 m_mutex.unlock();
|
Chris@148
|
69
|
Chris@148
|
70 m_condition.wakeAll();
|
Chris@148
|
71
|
Chris@148
|
72 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
73 std::cerr << "FileReadThread::finish() exiting" << std::endl;
|
Chris@148
|
74 #endif
|
Chris@148
|
75 }
|
Chris@148
|
76
|
Chris@148
|
77 int
|
Chris@148
|
78 FileReadThread::request(const Request &request)
|
Chris@148
|
79 {
|
Chris@148
|
80 m_mutex.lock();
|
Chris@148
|
81
|
Chris@148
|
82 int token = m_nextToken++;
|
Chris@148
|
83 m_queue[token] = request;
|
Chris@148
|
84
|
Chris@148
|
85 m_mutex.unlock();
|
Chris@148
|
86 m_condition.wakeAll();
|
Chris@148
|
87
|
Chris@148
|
88 return token;
|
Chris@148
|
89 }
|
Chris@148
|
90
|
Chris@148
|
91 void
|
Chris@148
|
92 FileReadThread::cancel(int token)
|
Chris@148
|
93 {
|
Chris@148
|
94 m_mutex.lock();
|
Chris@148
|
95
|
Chris@148
|
96 if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
97 m_cancelledRequests[token] = m_queue[token];
|
Chris@148
|
98 m_queue.erase(token);
|
Chris@148
|
99 m_newlyCancelled.insert(token);
|
Chris@148
|
100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
101 m_cancelledRequests[token] = m_readyRequests[token];
|
Chris@148
|
102 m_readyRequests.erase(token);
|
Chris@148
|
103 } else {
|
Chris@148
|
104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
|
Chris@148
|
105 }
|
Chris@148
|
106
|
Chris@148
|
107 m_mutex.unlock();
|
Chris@148
|
108
|
Chris@148
|
109 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
|
Chris@148
|
111 #endif
|
Chris@148
|
112
|
Chris@148
|
113 m_condition.wakeAll();
|
Chris@148
|
114 }
|
Chris@148
|
115
|
Chris@148
|
116 bool
|
Chris@148
|
117 FileReadThread::isReady(int token)
|
Chris@148
|
118 {
|
Chris@148
|
119 m_mutex.lock();
|
Chris@148
|
120
|
Chris@148
|
121 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
|
Chris@148
|
122
|
Chris@148
|
123 m_mutex.unlock();
|
Chris@148
|
124 return ready;
|
Chris@148
|
125 }
|
Chris@148
|
126
|
Chris@148
|
127 bool
|
Chris@148
|
128 FileReadThread::isCancelled(int token)
|
Chris@148
|
129 {
|
Chris@148
|
130 m_mutex.lock();
|
Chris@148
|
131
|
Chris@148
|
132 bool cancelled =
|
Chris@148
|
133 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
|
Chris@148
|
134 m_newlyCancelled.find(token) == m_newlyCancelled.end();
|
Chris@148
|
135
|
Chris@148
|
136 m_mutex.unlock();
|
Chris@148
|
137 return cancelled;
|
Chris@148
|
138 }
|
Chris@148
|
139
|
Chris@148
|
140 bool
|
Chris@148
|
141 FileReadThread::getRequest(int token, Request &request)
|
Chris@148
|
142 {
|
Chris@148
|
143 m_mutex.lock();
|
Chris@148
|
144
|
Chris@148
|
145 bool found = false;
|
Chris@148
|
146
|
Chris@148
|
147 if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
148 request = m_queue[token];
|
Chris@148
|
149 found = true;
|
Chris@148
|
150 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
151 request = m_cancelledRequests[token];
|
Chris@148
|
152 found = true;
|
Chris@148
|
153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
154 request = m_readyRequests[token];
|
Chris@148
|
155 found = true;
|
Chris@148
|
156 }
|
Chris@148
|
157
|
Chris@148
|
158 m_mutex.unlock();
|
Chris@148
|
159
|
Chris@148
|
160 return found;
|
Chris@148
|
161 }
|
Chris@148
|
162
|
Chris@148
|
163 void
|
Chris@148
|
164 FileReadThread::done(int token)
|
Chris@148
|
165 {
|
Chris@148
|
166 m_mutex.lock();
|
Chris@148
|
167
|
Chris@148
|
168 bool found = false;
|
Chris@148
|
169
|
Chris@148
|
170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
171 m_cancelledRequests.erase(token);
|
Chris@148
|
172 m_newlyCancelled.erase(token);
|
Chris@148
|
173 found = true;
|
Chris@148
|
174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
175 m_readyRequests.erase(token);
|
Chris@148
|
176 found = true;
|
Chris@148
|
177 } else if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
|
Chris@148
|
179 }
|
Chris@148
|
180
|
Chris@148
|
181 m_mutex.unlock();
|
Chris@148
|
182
|
Chris@148
|
183 if (!found) {
|
Chris@148
|
184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
|
Chris@148
|
185 }
|
Chris@148
|
186 }
|
Chris@148
|
187
|
Chris@148
|
188 void
|
Chris@148
|
189 FileReadThread::process()
|
Chris@148
|
190 {
|
Chris@148
|
191 // entered with m_mutex locked and m_queue non-empty
|
Chris@148
|
192
|
Chris@148
|
193 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
194 Profiler profiler("FileReadThread::process()", true);
|
Chris@148
|
195 #endif
|
Chris@148
|
196
|
Chris@148
|
197 int token = m_queue.begin()->first;
|
Chris@148
|
198 Request request = m_queue.begin()->second;
|
Chris@148
|
199
|
Chris@148
|
200 m_mutex.unlock();
|
Chris@148
|
201
|
Chris@148
|
202 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
203 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
|
Chris@148
|
204 #endif
|
Chris@148
|
205
|
Chris@148
|
206 bool successful = false;
|
Chris@148
|
207 bool seekFailed = false;
|
Chris@148
|
208 ssize_t r = 0;
|
Chris@148
|
209
|
Chris@148
|
210 if (request.mutex) request.mutex->lock();
|
Chris@148
|
211
|
Chris@148
|
212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
|
Chris@148
|
213 seekFailed = true;
|
Chris@148
|
214 } else {
|
Chris@148
|
215
|
Chris@148
|
216 // if request.size is large, we want to avoid making a single
|
Chris@148
|
217 // system call to read it all as it may block too much
|
Chris@148
|
218
|
Chris@148
|
219 static const size_t blockSize = 256 * 1024;
|
Chris@148
|
220
|
Chris@148
|
221 size_t size = request.size;
|
Chris@148
|
222 char *destination = request.data;
|
Chris@148
|
223
|
Chris@148
|
224 while (size > 0) {
|
Chris@148
|
225 size_t readSize = size;
|
Chris@148
|
226 if (readSize > blockSize) readSize = blockSize;
|
Chris@148
|
227 ssize_t br = ::read(request.fd, destination, readSize);
|
Chris@148
|
228 if (br < 0) {
|
Chris@148
|
229 r = br;
|
Chris@148
|
230 break;
|
Chris@148
|
231 } else {
|
Chris@148
|
232 r += br;
|
Chris@148
|
233 if (br < ssize_t(readSize)) break;
|
Chris@148
|
234 }
|
Chris@148
|
235 destination += readSize;
|
Chris@148
|
236 size -= readSize;
|
Chris@148
|
237 }
|
Chris@148
|
238 }
|
Chris@148
|
239
|
Chris@148
|
240 if (request.mutex) request.mutex->unlock();
|
Chris@148
|
241
|
Chris@148
|
242 if (seekFailed) {
|
Chris@148
|
243 ::perror("Seek failed");
|
Chris@148
|
244 std::cerr << "ERROR: FileReadThread::process: seek to "
|
Chris@148
|
245 << request.start << " failed" << std::endl;
|
Chris@148
|
246 request.size = 0;
|
Chris@148
|
247 } else {
|
Chris@148
|
248 if (r < 0) {
|
Chris@148
|
249 ::perror("ERROR: FileReadThread::process: Read failed");
|
Chris@148
|
250 request.size = 0;
|
Chris@148
|
251 } else if (r < ssize_t(request.size)) {
|
Chris@148
|
252 std::cerr << "WARNING: FileReadThread::process: read "
|
Chris@148
|
253 << request.size << " returned only " << r << " bytes"
|
Chris@148
|
254 << std::endl;
|
Chris@148
|
255 request.size = r;
|
Chris@148
|
256 usleep(100000);
|
Chris@148
|
257 } else {
|
Chris@148
|
258 successful = true;
|
Chris@148
|
259 }
|
Chris@148
|
260 }
|
Chris@148
|
261
|
Chris@148
|
262 // Check that the token hasn't been cancelled and the thread
|
Chris@148
|
263 // hasn't been asked to finish
|
Chris@148
|
264
|
Chris@148
|
265 m_mutex.lock();
|
Chris@148
|
266
|
Chris@148
|
267 request.successful = successful;
|
Chris@148
|
268
|
Chris@148
|
269 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
|
Chris@148
|
270 m_queue.erase(token);
|
Chris@148
|
271 m_readyRequests[token] = request;
|
Chris@148
|
272 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
273 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
|
Chris@148
|
274 #endif
|
Chris@148
|
275 } else {
|
Chris@148
|
276 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
277 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
|
Chris@148
|
278 #endif
|
Chris@148
|
279 }
|
Chris@148
|
280 }
|
Chris@148
|
281
|
Chris@148
|
282 void
|
Chris@148
|
283 FileReadThread::notifyCancelled()
|
Chris@148
|
284 {
|
Chris@148
|
285 // entered with m_mutex locked
|
Chris@148
|
286
|
Chris@148
|
287 while (!m_newlyCancelled.empty()) {
|
Chris@148
|
288
|
Chris@148
|
289 int token = *m_newlyCancelled.begin();
|
Chris@148
|
290
|
Chris@148
|
291 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
292 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
|
Chris@148
|
293 #endif
|
Chris@148
|
294
|
Chris@148
|
295 m_newlyCancelled.erase(token);
|
Chris@148
|
296 }
|
Chris@148
|
297 }
|
Chris@148
|
298
|
Chris@148
|
299
|