comparison data/fileio/FileReadThread.cpp @ 148:1a42221a1522

* Reorganising code base. This revision will not compile.
author Chris Cannam
date Mon, 31 Jul 2006 11:49:58 +0000
parents
children 115f60df1e4d
comparison
equal deleted inserted replaced
147:3a13b0d4934e 148:1a42221a1522
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
2
3 /*
4 Sonic Visualiser
5 An audio file viewer and annotation editor.
6 Centre for Digital Music, Queen Mary, University of London.
7 This file copyright 2006 Chris Cannam.
8
9 This program is free software; you can redistribute it and/or
10 modify it under the terms of the GNU General Public License as
11 published by the Free Software Foundation; either version 2 of the
12 License, or (at your option) any later version. See the file
13 COPYING included with this distribution for more information.
14 */
15
16 #include "FileReadThread.h"
17
18 #include "base/Profiler.h"
19
20 #include <iostream>
21 #include <unistd.h>
22
23 //#define DEBUG_FILE_READ_THREAD 1
24
25 FileReadThread::FileReadThread() :
26 m_nextToken(0),
27 m_exiting(false)
28 {
29 }
30
31 void
32 FileReadThread::run()
33 {
34 m_mutex.lock();
35
36 while (!m_exiting) {
37 if (m_queue.empty()) {
38 m_condition.wait(&m_mutex, 1000);
39 } else {
40 process();
41 }
42 notifyCancelled();
43 }
44
45 notifyCancelled();
46 m_mutex.unlock();
47
48 #ifdef DEBUG_FILE_READ_THREAD
49 std::cerr << "FileReadThread::run() exiting" << std::endl;
50 #endif
51 }
52
53 void
54 FileReadThread::finish()
55 {
56 #ifdef DEBUG_FILE_READ_THREAD
57 std::cerr << "FileReadThread::finish()" << std::endl;
58 #endif
59
60 m_mutex.lock();
61 while (!m_queue.empty()) {
62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
63 m_newlyCancelled.insert(m_queue.begin()->first);
64 m_queue.erase(m_queue.begin());
65 }
66
67 m_exiting = true;
68 m_mutex.unlock();
69
70 m_condition.wakeAll();
71
72 #ifdef DEBUG_FILE_READ_THREAD
73 std::cerr << "FileReadThread::finish() exiting" << std::endl;
74 #endif
75 }
76
77 int
78 FileReadThread::request(const Request &request)
79 {
80 m_mutex.lock();
81
82 int token = m_nextToken++;
83 m_queue[token] = request;
84
85 m_mutex.unlock();
86 m_condition.wakeAll();
87
88 return token;
89 }
90
91 void
92 FileReadThread::cancel(int token)
93 {
94 m_mutex.lock();
95
96 if (m_queue.find(token) != m_queue.end()) {
97 m_cancelledRequests[token] = m_queue[token];
98 m_queue.erase(token);
99 m_newlyCancelled.insert(token);
100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
101 m_cancelledRequests[token] = m_readyRequests[token];
102 m_readyRequests.erase(token);
103 } else {
104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
105 }
106
107 m_mutex.unlock();
108
109 #ifdef DEBUG_FILE_READ_THREAD
110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
111 #endif
112
113 m_condition.wakeAll();
114 }
115
116 bool
117 FileReadThread::isReady(int token)
118 {
119 m_mutex.lock();
120
121 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
122
123 m_mutex.unlock();
124 return ready;
125 }
126
127 bool
128 FileReadThread::isCancelled(int token)
129 {
130 m_mutex.lock();
131
132 bool cancelled =
133 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
134 m_newlyCancelled.find(token) == m_newlyCancelled.end();
135
136 m_mutex.unlock();
137 return cancelled;
138 }
139
140 bool
141 FileReadThread::getRequest(int token, Request &request)
142 {
143 m_mutex.lock();
144
145 bool found = false;
146
147 if (m_queue.find(token) != m_queue.end()) {
148 request = m_queue[token];
149 found = true;
150 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
151 request = m_cancelledRequests[token];
152 found = true;
153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
154 request = m_readyRequests[token];
155 found = true;
156 }
157
158 m_mutex.unlock();
159
160 return found;
161 }
162
163 void
164 FileReadThread::done(int token)
165 {
166 m_mutex.lock();
167
168 bool found = false;
169
170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
171 m_cancelledRequests.erase(token);
172 m_newlyCancelled.erase(token);
173 found = true;
174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
175 m_readyRequests.erase(token);
176 found = true;
177 } else if (m_queue.find(token) != m_queue.end()) {
178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
179 }
180
181 m_mutex.unlock();
182
183 if (!found) {
184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
185 }
186 }
187
188 void
189 FileReadThread::process()
190 {
191 // entered with m_mutex locked and m_queue non-empty
192
193 #ifdef DEBUG_FILE_READ_THREAD
194 Profiler profiler("FileReadThread::process()", true);
195 #endif
196
197 int token = m_queue.begin()->first;
198 Request request = m_queue.begin()->second;
199
200 m_mutex.unlock();
201
202 #ifdef DEBUG_FILE_READ_THREAD
203 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
204 #endif
205
206 bool successful = false;
207 bool seekFailed = false;
208 ssize_t r = 0;
209
210 if (request.mutex) request.mutex->lock();
211
212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
213 seekFailed = true;
214 } else {
215
216 // if request.size is large, we want to avoid making a single
217 // system call to read it all as it may block too much
218
219 static const size_t blockSize = 256 * 1024;
220
221 size_t size = request.size;
222 char *destination = request.data;
223
224 while (size > 0) {
225 size_t readSize = size;
226 if (readSize > blockSize) readSize = blockSize;
227 ssize_t br = ::read(request.fd, destination, readSize);
228 if (br < 0) {
229 r = br;
230 break;
231 } else {
232 r += br;
233 if (br < ssize_t(readSize)) break;
234 }
235 destination += readSize;
236 size -= readSize;
237 }
238 }
239
240 if (request.mutex) request.mutex->unlock();
241
242 if (seekFailed) {
243 ::perror("Seek failed");
244 std::cerr << "ERROR: FileReadThread::process: seek to "
245 << request.start << " failed" << std::endl;
246 request.size = 0;
247 } else {
248 if (r < 0) {
249 ::perror("ERROR: FileReadThread::process: Read failed");
250 request.size = 0;
251 } else if (r < ssize_t(request.size)) {
252 std::cerr << "WARNING: FileReadThread::process: read "
253 << request.size << " returned only " << r << " bytes"
254 << std::endl;
255 request.size = r;
256 usleep(100000);
257 } else {
258 successful = true;
259 }
260 }
261
262 // Check that the token hasn't been cancelled and the thread
263 // hasn't been asked to finish
264
265 m_mutex.lock();
266
267 request.successful = successful;
268
269 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
270 m_queue.erase(token);
271 m_readyRequests[token] = request;
272 #ifdef DEBUG_FILE_READ_THREAD
273 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
274 #endif
275 } else {
276 #ifdef DEBUG_FILE_READ_THREAD
277 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
278 #endif
279 }
280 }
281
282 void
283 FileReadThread::notifyCancelled()
284 {
285 // entered with m_mutex locked
286
287 while (!m_newlyCancelled.empty()) {
288
289 int token = *m_newlyCancelled.begin();
290
291 #ifdef DEBUG_FILE_READ_THREAD
292 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
293 #endif
294
295 m_newlyCancelled.erase(token);
296 }
297 }
298
299