Chris@148
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
Chris@148
|
2
|
Chris@148
|
3 /*
|
Chris@148
|
4 Sonic Visualiser
|
Chris@148
|
5 An audio file viewer and annotation editor.
|
Chris@148
|
6 Centre for Digital Music, Queen Mary, University of London.
|
Chris@148
|
7 This file copyright 2006 Chris Cannam.
|
Chris@148
|
8
|
Chris@148
|
9 This program is free software; you can redistribute it and/or
|
Chris@148
|
10 modify it under the terms of the GNU General Public License as
|
Chris@148
|
11 published by the Free Software Foundation; either version 2 of the
|
Chris@148
|
12 License, or (at your option) any later version. See the file
|
Chris@148
|
13 COPYING included with this distribution for more information.
|
Chris@148
|
14 */
|
Chris@148
|
15
|
Chris@148
|
16 #include "FileReadThread.h"
|
Chris@148
|
17
|
Chris@148
|
18 #include "base/Profiler.h"
|
Chris@408
|
19 #include "base/Thread.h"
|
Chris@148
|
20
|
Chris@148
|
21 #include <iostream>
|
Chris@148
|
22 #include <unistd.h>
|
Chris@148
|
23
|
Chris@453
|
24 #define DEBUG_FILE_READ_THREAD 1
|
Chris@148
|
25
|
Chris@148
|
26 FileReadThread::FileReadThread() :
|
Chris@148
|
27 m_nextToken(0),
|
Chris@148
|
28 m_exiting(false)
|
Chris@148
|
29 {
|
Chris@148
|
30 }
|
Chris@148
|
31
|
Chris@148
|
32 void
|
Chris@148
|
33 FileReadThread::run()
|
Chris@148
|
34 {
|
Chris@408
|
35 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
|
Chris@148
|
36
|
Chris@148
|
37 while (!m_exiting) {
|
Chris@148
|
38 if (m_queue.empty()) {
|
Chris@148
|
39 m_condition.wait(&m_mutex, 1000);
|
Chris@148
|
40 } else {
|
Chris@148
|
41 process();
|
Chris@148
|
42 }
|
Chris@148
|
43 notifyCancelled();
|
Chris@148
|
44 }
|
Chris@148
|
45
|
Chris@148
|
46 notifyCancelled();
|
Chris@148
|
47
|
Chris@148
|
48 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
49 std::cerr << "FileReadThread::run() exiting" << std::endl;
|
Chris@148
|
50 #endif
|
Chris@148
|
51 }
|
Chris@148
|
52
|
Chris@148
|
53 void
|
Chris@148
|
54 FileReadThread::finish()
|
Chris@148
|
55 {
|
Chris@148
|
56 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
57 std::cerr << "FileReadThread::finish()" << std::endl;
|
Chris@148
|
58 #endif
|
Chris@148
|
59
|
Chris@408
|
60 {
|
Chris@408
|
61 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
|
Chris@408
|
62
|
Chris@408
|
63 while (!m_queue.empty()) {
|
Chris@408
|
64 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
|
Chris@408
|
65 m_newlyCancelled.insert(m_queue.begin()->first);
|
Chris@408
|
66 m_queue.erase(m_queue.begin());
|
Chris@408
|
67 }
|
Chris@408
|
68
|
Chris@408
|
69 m_exiting = true;
|
Chris@148
|
70 }
|
Chris@148
|
71
|
Chris@148
|
72 m_condition.wakeAll();
|
Chris@148
|
73
|
Chris@148
|
74 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
75 std::cerr << "FileReadThread::finish() exiting" << std::endl;
|
Chris@148
|
76 #endif
|
Chris@148
|
77 }
|
Chris@148
|
78
|
Chris@148
|
79 int
|
Chris@148
|
80 FileReadThread::request(const Request &request)
|
Chris@148
|
81 {
|
Chris@408
|
82 int token;
|
Chris@408
|
83
|
Chris@408
|
84 {
|
Chris@408
|
85 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
|
Chris@148
|
86
|
Chris@408
|
87 token = m_nextToken++;
|
Chris@408
|
88 m_queue[token] = request;
|
Chris@408
|
89 }
|
Chris@148
|
90
|
Chris@148
|
91 m_condition.wakeAll();
|
Chris@148
|
92
|
Chris@148
|
93 return token;
|
Chris@148
|
94 }
|
Chris@148
|
95
|
Chris@148
|
96 void
|
Chris@148
|
97 FileReadThread::cancel(int token)
|
Chris@148
|
98 {
|
Chris@408
|
99 {
|
Chris@408
|
100 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
|
Chris@148
|
101
|
Chris@408
|
102 if (m_queue.find(token) != m_queue.end()) {
|
Chris@408
|
103 m_cancelledRequests[token] = m_queue[token];
|
Chris@408
|
104 m_queue.erase(token);
|
Chris@408
|
105 m_newlyCancelled.insert(token);
|
Chris@408
|
106 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@408
|
107 m_cancelledRequests[token] = m_readyRequests[token];
|
Chris@408
|
108 m_readyRequests.erase(token);
|
Chris@408
|
109 } else {
|
Chris@408
|
110 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
|
Chris@408
|
111 }
|
Chris@148
|
112 }
|
Chris@148
|
113
|
Chris@148
|
114 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
115 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
|
Chris@148
|
116 #endif
|
Chris@148
|
117
|
Chris@148
|
118 m_condition.wakeAll();
|
Chris@148
|
119 }
|
Chris@148
|
120
|
Chris@148
|
121 bool
|
Chris@148
|
122 FileReadThread::isReady(int token)
|
Chris@148
|
123 {
|
Chris@408
|
124 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
|
Chris@148
|
125
|
Chris@148
|
126 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
|
Chris@148
|
127
|
Chris@148
|
128 return ready;
|
Chris@148
|
129 }
|
Chris@148
|
130
|
Chris@148
|
131 bool
|
Chris@148
|
132 FileReadThread::isCancelled(int token)
|
Chris@148
|
133 {
|
Chris@408
|
134 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
|
Chris@148
|
135
|
Chris@148
|
136 bool cancelled =
|
Chris@148
|
137 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
|
Chris@148
|
138 m_newlyCancelled.find(token) == m_newlyCancelled.end();
|
Chris@148
|
139
|
Chris@148
|
140 return cancelled;
|
Chris@148
|
141 }
|
Chris@148
|
142
|
Chris@148
|
143 bool
|
Chris@148
|
144 FileReadThread::getRequest(int token, Request &request)
|
Chris@148
|
145 {
|
Chris@408
|
146 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
|
Chris@148
|
147
|
Chris@148
|
148 bool found = false;
|
Chris@148
|
149
|
Chris@148
|
150 if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
151 request = m_queue[token];
|
Chris@148
|
152 found = true;
|
Chris@148
|
153 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
154 request = m_cancelledRequests[token];
|
Chris@148
|
155 found = true;
|
Chris@148
|
156 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
157 request = m_readyRequests[token];
|
Chris@148
|
158 found = true;
|
Chris@148
|
159 }
|
Chris@148
|
160
|
Chris@148
|
161 return found;
|
Chris@148
|
162 }
|
Chris@148
|
163
|
Chris@148
|
164 void
|
Chris@148
|
165 FileReadThread::done(int token)
|
Chris@148
|
166 {
|
Chris@408
|
167 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
|
Chris@148
|
168
|
Chris@148
|
169 bool found = false;
|
Chris@148
|
170
|
Chris@148
|
171 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
|
Chris@148
|
172 m_cancelledRequests.erase(token);
|
Chris@148
|
173 m_newlyCancelled.erase(token);
|
Chris@148
|
174 found = true;
|
Chris@148
|
175 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
|
Chris@148
|
176 m_readyRequests.erase(token);
|
Chris@148
|
177 found = true;
|
Chris@148
|
178 } else if (m_queue.find(token) != m_queue.end()) {
|
Chris@148
|
179 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
|
Chris@148
|
180 }
|
Chris@148
|
181
|
Chris@148
|
182 if (!found) {
|
Chris@148
|
183 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
|
Chris@148
|
184 }
|
Chris@148
|
185 }
|
Chris@148
|
186
|
Chris@148
|
187 void
|
Chris@148
|
188 FileReadThread::process()
|
Chris@148
|
189 {
|
Chris@148
|
190 // entered with m_mutex locked and m_queue non-empty
|
Chris@148
|
191
|
Chris@408
|
192 Profiler profiler("FileReadThread::process", true);
|
Chris@148
|
193
|
Chris@148
|
194 int token = m_queue.begin()->first;
|
Chris@148
|
195 Request request = m_queue.begin()->second;
|
Chris@148
|
196
|
Chris@148
|
197 m_mutex.unlock();
|
Chris@148
|
198
|
Chris@148
|
199 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
200 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
|
Chris@148
|
201 #endif
|
Chris@148
|
202
|
Chris@148
|
203 bool successful = false;
|
Chris@148
|
204 bool seekFailed = false;
|
Chris@148
|
205 ssize_t r = 0;
|
Chris@148
|
206
|
Chris@408
|
207 {
|
Chris@408
|
208 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
|
Chris@148
|
209
|
Chris@408
|
210 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
|
Chris@408
|
211 seekFailed = true;
|
Chris@408
|
212 } else {
|
Chris@148
|
213
|
Chris@408
|
214 // if request.size is large, we want to avoid making a single
|
Chris@408
|
215 // system call to read it all as it may block too much
|
Chris@408
|
216
|
Chris@408
|
217 static const size_t blockSize = 256 * 1024;
|
Chris@408
|
218
|
Chris@408
|
219 size_t size = request.size;
|
Chris@408
|
220 char *destination = request.data;
|
Chris@408
|
221
|
Chris@408
|
222 while (size > 0) {
|
Chris@408
|
223 size_t readSize = size;
|
Chris@408
|
224 if (readSize > blockSize) readSize = blockSize;
|
Chris@408
|
225 ssize_t br = ::read(request.fd, destination, readSize);
|
Chris@408
|
226 if (br < 0) {
|
Chris@408
|
227 r = br;
|
Chris@408
|
228 break;
|
Chris@408
|
229 } else {
|
Chris@408
|
230 r += br;
|
Chris@408
|
231 if (br < ssize_t(readSize)) break;
|
Chris@408
|
232 }
|
Chris@408
|
233 destination += readSize;
|
Chris@408
|
234 size -= readSize;
|
Chris@148
|
235 }
|
Chris@148
|
236 }
|
Chris@148
|
237 }
|
Chris@148
|
238
|
Chris@148
|
239 if (seekFailed) {
|
Chris@148
|
240 ::perror("Seek failed");
|
Chris@148
|
241 std::cerr << "ERROR: FileReadThread::process: seek to "
|
Chris@148
|
242 << request.start << " failed" << std::endl;
|
Chris@148
|
243 request.size = 0;
|
Chris@148
|
244 } else {
|
Chris@148
|
245 if (r < 0) {
|
Chris@148
|
246 ::perror("ERROR: FileReadThread::process: Read failed");
|
Chris@148
|
247 request.size = 0;
|
Chris@148
|
248 } else if (r < ssize_t(request.size)) {
|
Chris@148
|
249 std::cerr << "WARNING: FileReadThread::process: read "
|
Chris@148
|
250 << request.size << " returned only " << r << " bytes"
|
Chris@148
|
251 << std::endl;
|
Chris@148
|
252 request.size = r;
|
Chris@148
|
253 usleep(100000);
|
Chris@148
|
254 } else {
|
Chris@148
|
255 successful = true;
|
Chris@148
|
256 }
|
Chris@148
|
257 }
|
Chris@148
|
258
|
Chris@148
|
259 // Check that the token hasn't been cancelled and the thread
|
Chris@148
|
260 // hasn't been asked to finish
|
Chris@148
|
261
|
Chris@148
|
262 m_mutex.lock();
|
Chris@148
|
263
|
Chris@148
|
264 request.successful = successful;
|
Chris@148
|
265
|
Chris@148
|
266 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
|
Chris@148
|
267 m_queue.erase(token);
|
Chris@148
|
268 m_readyRequests[token] = request;
|
Chris@148
|
269 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
270 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
|
Chris@148
|
271 #endif
|
Chris@148
|
272 } else {
|
Chris@148
|
273 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
274 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
|
Chris@148
|
275 #endif
|
Chris@148
|
276 }
|
Chris@148
|
277 }
|
Chris@148
|
278
|
Chris@148
|
279 void
|
Chris@148
|
280 FileReadThread::notifyCancelled()
|
Chris@148
|
281 {
|
Chris@148
|
282 // entered with m_mutex locked
|
Chris@148
|
283
|
Chris@148
|
284 while (!m_newlyCancelled.empty()) {
|
Chris@148
|
285
|
Chris@148
|
286 int token = *m_newlyCancelled.begin();
|
Chris@148
|
287
|
Chris@148
|
288 #ifdef DEBUG_FILE_READ_THREAD
|
Chris@148
|
289 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
|
Chris@148
|
290 #endif
|
Chris@148
|
291
|
Chris@148
|
292 m_newlyCancelled.erase(token);
|
Chris@148
|
293 }
|
Chris@148
|
294 }
|
Chris@148
|
295
|
Chris@148
|
296
|