Mercurial > hg > svcore
comparison data/fileio/FileReadThread.cpp @ 148:1a42221a1522
* Reorganising code base. This revision will not compile.
author | Chris Cannam |
---|---|
date | Mon, 31 Jul 2006 11:49:58 +0000 |
parents | |
children | 115f60df1e4d |
comparison
equal
deleted
inserted
replaced
147:3a13b0d4934e | 148:1a42221a1522 |
---|---|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ | |
2 | |
3 /* | |
4 Sonic Visualiser | |
5 An audio file viewer and annotation editor. | |
6 Centre for Digital Music, Queen Mary, University of London. | |
7 This file copyright 2006 Chris Cannam. | |
8 | |
9 This program is free software; you can redistribute it and/or | |
10 modify it under the terms of the GNU General Public License as | |
11 published by the Free Software Foundation; either version 2 of the | |
12 License, or (at your option) any later version. See the file | |
13 COPYING included with this distribution for more information. | |
14 */ | |
15 | |
16 #include "FileReadThread.h" | |
17 | |
18 #include "base/Profiler.h" | |
19 | |
20 #include <iostream> | |
21 #include <unistd.h> | |
22 | |
23 //#define DEBUG_FILE_READ_THREAD 1 | |
24 | |
25 FileReadThread::FileReadThread() : | |
26 m_nextToken(0), | |
27 m_exiting(false) | |
28 { | |
29 } | |
30 | |
31 void | |
32 FileReadThread::run() | |
33 { | |
34 m_mutex.lock(); | |
35 | |
36 while (!m_exiting) { | |
37 if (m_queue.empty()) { | |
38 m_condition.wait(&m_mutex, 1000); | |
39 } else { | |
40 process(); | |
41 } | |
42 notifyCancelled(); | |
43 } | |
44 | |
45 notifyCancelled(); | |
46 m_mutex.unlock(); | |
47 | |
48 #ifdef DEBUG_FILE_READ_THREAD | |
49 std::cerr << "FileReadThread::run() exiting" << std::endl; | |
50 #endif | |
51 } | |
52 | |
53 void | |
54 FileReadThread::finish() | |
55 { | |
56 #ifdef DEBUG_FILE_READ_THREAD | |
57 std::cerr << "FileReadThread::finish()" << std::endl; | |
58 #endif | |
59 | |
60 m_mutex.lock(); | |
61 while (!m_queue.empty()) { | |
62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; | |
63 m_newlyCancelled.insert(m_queue.begin()->first); | |
64 m_queue.erase(m_queue.begin()); | |
65 } | |
66 | |
67 m_exiting = true; | |
68 m_mutex.unlock(); | |
69 | |
70 m_condition.wakeAll(); | |
71 | |
72 #ifdef DEBUG_FILE_READ_THREAD | |
73 std::cerr << "FileReadThread::finish() exiting" << std::endl; | |
74 #endif | |
75 } | |
76 | |
77 int | |
78 FileReadThread::request(const Request &request) | |
79 { | |
80 m_mutex.lock(); | |
81 | |
82 int token = m_nextToken++; | |
83 m_queue[token] = request; | |
84 | |
85 m_mutex.unlock(); | |
86 m_condition.wakeAll(); | |
87 | |
88 return token; | |
89 } | |
90 | |
91 void | |
92 FileReadThread::cancel(int token) | |
93 { | |
94 m_mutex.lock(); | |
95 | |
96 if (m_queue.find(token) != m_queue.end()) { | |
97 m_cancelledRequests[token] = m_queue[token]; | |
98 m_queue.erase(token); | |
99 m_newlyCancelled.insert(token); | |
100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { | |
101 m_cancelledRequests[token] = m_readyRequests[token]; | |
102 m_readyRequests.erase(token); | |
103 } else { | |
104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; | |
105 } | |
106 | |
107 m_mutex.unlock(); | |
108 | |
109 #ifdef DEBUG_FILE_READ_THREAD | |
110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; | |
111 #endif | |
112 | |
113 m_condition.wakeAll(); | |
114 } | |
115 | |
116 bool | |
117 FileReadThread::isReady(int token) | |
118 { | |
119 m_mutex.lock(); | |
120 | |
121 bool ready = m_readyRequests.find(token) != m_readyRequests.end(); | |
122 | |
123 m_mutex.unlock(); | |
124 return ready; | |
125 } | |
126 | |
127 bool | |
128 FileReadThread::isCancelled(int token) | |
129 { | |
130 m_mutex.lock(); | |
131 | |
132 bool cancelled = | |
133 m_cancelledRequests.find(token) != m_cancelledRequests.end() && | |
134 m_newlyCancelled.find(token) == m_newlyCancelled.end(); | |
135 | |
136 m_mutex.unlock(); | |
137 return cancelled; | |
138 } | |
139 | |
140 bool | |
141 FileReadThread::getRequest(int token, Request &request) | |
142 { | |
143 m_mutex.lock(); | |
144 | |
145 bool found = false; | |
146 | |
147 if (m_queue.find(token) != m_queue.end()) { | |
148 request = m_queue[token]; | |
149 found = true; | |
150 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { | |
151 request = m_cancelledRequests[token]; | |
152 found = true; | |
153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { | |
154 request = m_readyRequests[token]; | |
155 found = true; | |
156 } | |
157 | |
158 m_mutex.unlock(); | |
159 | |
160 return found; | |
161 } | |
162 | |
163 void | |
164 FileReadThread::done(int token) | |
165 { | |
166 m_mutex.lock(); | |
167 | |
168 bool found = false; | |
169 | |
170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { | |
171 m_cancelledRequests.erase(token); | |
172 m_newlyCancelled.erase(token); | |
173 found = true; | |
174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { | |
175 m_readyRequests.erase(token); | |
176 found = true; | |
177 } else if (m_queue.find(token) != m_queue.end()) { | |
178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; | |
179 } | |
180 | |
181 m_mutex.unlock(); | |
182 | |
183 if (!found) { | |
184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; | |
185 } | |
186 } | |
187 | |
188 void | |
189 FileReadThread::process() | |
190 { | |
191 // entered with m_mutex locked and m_queue non-empty | |
192 | |
193 #ifdef DEBUG_FILE_READ_THREAD | |
194 Profiler profiler("FileReadThread::process()", true); | |
195 #endif | |
196 | |
197 int token = m_queue.begin()->first; | |
198 Request request = m_queue.begin()->second; | |
199 | |
200 m_mutex.unlock(); | |
201 | |
202 #ifdef DEBUG_FILE_READ_THREAD | |
203 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl; | |
204 #endif | |
205 | |
206 bool successful = false; | |
207 bool seekFailed = false; | |
208 ssize_t r = 0; | |
209 | |
210 if (request.mutex) request.mutex->lock(); | |
211 | |
212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { | |
213 seekFailed = true; | |
214 } else { | |
215 | |
216 // if request.size is large, we want to avoid making a single | |
217 // system call to read it all as it may block too much | |
218 | |
219 static const size_t blockSize = 256 * 1024; | |
220 | |
221 size_t size = request.size; | |
222 char *destination = request.data; | |
223 | |
224 while (size > 0) { | |
225 size_t readSize = size; | |
226 if (readSize > blockSize) readSize = blockSize; | |
227 ssize_t br = ::read(request.fd, destination, readSize); | |
228 if (br < 0) { | |
229 r = br; | |
230 break; | |
231 } else { | |
232 r += br; | |
233 if (br < ssize_t(readSize)) break; | |
234 } | |
235 destination += readSize; | |
236 size -= readSize; | |
237 } | |
238 } | |
239 | |
240 if (request.mutex) request.mutex->unlock(); | |
241 | |
242 if (seekFailed) { | |
243 ::perror("Seek failed"); | |
244 std::cerr << "ERROR: FileReadThread::process: seek to " | |
245 << request.start << " failed" << std::endl; | |
246 request.size = 0; | |
247 } else { | |
248 if (r < 0) { | |
249 ::perror("ERROR: FileReadThread::process: Read failed"); | |
250 request.size = 0; | |
251 } else if (r < ssize_t(request.size)) { | |
252 std::cerr << "WARNING: FileReadThread::process: read " | |
253 << request.size << " returned only " << r << " bytes" | |
254 << std::endl; | |
255 request.size = r; | |
256 usleep(100000); | |
257 } else { | |
258 successful = true; | |
259 } | |
260 } | |
261 | |
262 // Check that the token hasn't been cancelled and the thread | |
263 // hasn't been asked to finish | |
264 | |
265 m_mutex.lock(); | |
266 | |
267 request.successful = successful; | |
268 | |
269 if (m_queue.find(token) != m_queue.end() && !m_exiting) { | |
270 m_queue.erase(token); | |
271 m_readyRequests[token] = request; | |
272 #ifdef DEBUG_FILE_READ_THREAD | |
273 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl; | |
274 #endif | |
275 } else { | |
276 #ifdef DEBUG_FILE_READ_THREAD | |
277 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl; | |
278 #endif | |
279 } | |
280 } | |
281 | |
282 void | |
283 FileReadThread::notifyCancelled() | |
284 { | |
285 // entered with m_mutex locked | |
286 | |
287 while (!m_newlyCancelled.empty()) { | |
288 | |
289 int token = *m_newlyCancelled.begin(); | |
290 | |
291 #ifdef DEBUG_FILE_READ_THREAD | |
292 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl; | |
293 #endif | |
294 | |
295 m_newlyCancelled.erase(token); | |
296 } | |
297 } | |
298 | |
299 |