Mercurial > hg > svcore
comparison base/FileReadThread.cpp @ 96:1aebdc68ec6d
* Introduce simple non-RT thread base class
* Rename MatrixFileCache to MatrixFile
* some fixes & tidying
author | Chris Cannam |
---|---|
date | Thu, 04 May 2006 16:03:02 +0000 |
parents | 040a151d0897 |
children | 2b1a16e38d2d |
comparison
equal
deleted
inserted
replaced
95:040a151d0897 | 96:1aebdc68ec6d |
---|---|
31 while (!m_exiting) { | 31 while (!m_exiting) { |
32 if (m_queue.empty()) { | 32 if (m_queue.empty()) { |
33 m_condition.wait(&m_mutex, 1000); | 33 m_condition.wait(&m_mutex, 1000); |
34 } else { | 34 } else { |
35 process(); | 35 process(); |
36 notifyCancelled(); | |
37 } | 36 } |
37 notifyCancelled(); | |
38 } | 38 } |
39 | 39 |
40 notifyCancelled(); | 40 notifyCancelled(); |
41 m_mutex.unlock(); | 41 m_mutex.unlock(); |
42 | 42 |
84 | 84 |
85 if (m_queue.find(token) != m_queue.end()) { | 85 if (m_queue.find(token) != m_queue.end()) { |
86 m_cancelledRequests[token] = m_queue[token]; | 86 m_cancelledRequests[token] = m_queue[token]; |
87 m_queue.erase(token); | 87 m_queue.erase(token); |
88 m_newlyCancelled.insert(token); | 88 m_newlyCancelled.insert(token); |
89 } | 89 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { |
90 | 90 m_cancelledRequests[token] = m_readyRequests[token]; |
91 m_mutex.unlock(); | 91 m_readyRequests.erase(token); |
92 } else { | |
93 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; | |
94 } | |
95 | |
96 m_mutex.unlock(); | |
97 | |
98 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; | |
99 | |
100 m_condition.wakeAll(); | |
92 } | 101 } |
93 | 102 |
94 bool | 103 bool |
95 FileReadThread::isReady(int token) | 104 FileReadThread::isReady(int token) |
96 { | 105 { |
182 if (request.mutex) request.mutex->lock(); | 191 if (request.mutex) request.mutex->lock(); |
183 | 192 |
184 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { | 193 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { |
185 seekFailed = true; | 194 seekFailed = true; |
186 } else { | 195 } else { |
187 r = ::read(request.fd, request.data, request.size); | 196 |
197 // if request.size is large, we want to avoid making a single | |
198 // system call to read it all as it may block too much | |
199 | |
200 static const size_t blockSize = 16384; | |
201 | |
202 size_t size = request.size; | |
203 char *destination = request.data; | |
204 | |
205 while (size > 0) { | |
206 size_t readSize = size; | |
207 if (readSize > blockSize) readSize = blockSize; | |
208 ssize_t br = ::read(request.fd, destination, readSize); | |
209 if (br < 0) { | |
210 r = br; | |
211 break; | |
212 } else { | |
213 r += br; | |
214 if (br < ssize_t(readSize)) break; | |
215 } | |
216 destination += readSize; | |
217 size -= readSize; | |
218 } | |
188 } | 219 } |
189 | 220 |
190 if (request.mutex) request.mutex->unlock(); | 221 if (request.mutex) request.mutex->unlock(); |
191 | 222 |
192 if (seekFailed) { | 223 if (seekFailed) { |
227 FileReadThread::notifyCancelled() | 258 FileReadThread::notifyCancelled() |
228 { | 259 { |
229 // entered with m_mutex locked | 260 // entered with m_mutex locked |
230 | 261 |
231 while (!m_newlyCancelled.empty()) { | 262 while (!m_newlyCancelled.empty()) { |
263 | |
232 int token = *m_newlyCancelled.begin(); | 264 int token = *m_newlyCancelled.begin(); |
265 | |
266 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl; | |
267 | |
233 m_newlyCancelled.erase(token); | 268 m_newlyCancelled.erase(token); |
234 emit cancelled(token); | 269 emit cancelled(token); |
235 } | 270 } |
236 } | 271 } |
237 | 272 |