comparison base/FileReadThread.cpp @ 96:1aebdc68ec6d

* Introduce simple non-RT thread base class * Rename MatrixFileCache to MatrixFile * some fixes & tidying
author Chris Cannam
date Thu, 04 May 2006 16:03:02 +0000
parents 040a151d0897
children 2b1a16e38d2d
comparison
equal deleted inserted replaced
95:040a151d0897 96:1aebdc68ec6d
31 while (!m_exiting) { 31 while (!m_exiting) {
32 if (m_queue.empty()) { 32 if (m_queue.empty()) {
33 m_condition.wait(&m_mutex, 1000); 33 m_condition.wait(&m_mutex, 1000);
34 } else { 34 } else {
35 process(); 35 process();
36 notifyCancelled();
37 } 36 }
37 notifyCancelled();
38 } 38 }
39 39
40 notifyCancelled(); 40 notifyCancelled();
41 m_mutex.unlock(); 41 m_mutex.unlock();
42 42
84 84
85 if (m_queue.find(token) != m_queue.end()) { 85 if (m_queue.find(token) != m_queue.end()) {
86 m_cancelledRequests[token] = m_queue[token]; 86 m_cancelledRequests[token] = m_queue[token];
87 m_queue.erase(token); 87 m_queue.erase(token);
88 m_newlyCancelled.insert(token); 88 m_newlyCancelled.insert(token);
89 } 89 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
90 90 m_cancelledRequests[token] = m_readyRequests[token];
91 m_mutex.unlock(); 91 m_readyRequests.erase(token);
92 } else {
93 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
94 }
95
96 m_mutex.unlock();
97
98 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
99
100 m_condition.wakeAll();
92 } 101 }
93 102
94 bool 103 bool
95 FileReadThread::isReady(int token) 104 FileReadThread::isReady(int token)
96 { 105 {
182 if (request.mutex) request.mutex->lock(); 191 if (request.mutex) request.mutex->lock();
183 192
184 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { 193 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
185 seekFailed = true; 194 seekFailed = true;
186 } else { 195 } else {
187 r = ::read(request.fd, request.data, request.size); 196
197 // if request.size is large, we want to avoid making a single
198 // system call to read it all as it may block too much
199
200 static const size_t blockSize = 16384;
201
202 size_t size = request.size;
203 char *destination = request.data;
204
205 while (size > 0) {
206 size_t readSize = size;
207 if (readSize > blockSize) readSize = blockSize;
208 ssize_t br = ::read(request.fd, destination, readSize);
209 if (br < 0) {
210 r = br;
211 break;
212 } else {
213 r += br;
214 if (br < ssize_t(readSize)) break;
215 }
216 destination += readSize;
217 size -= readSize;
218 }
188 } 219 }
189 220
190 if (request.mutex) request.mutex->unlock(); 221 if (request.mutex) request.mutex->unlock();
191 222
192 if (seekFailed) { 223 if (seekFailed) {
227 FileReadThread::notifyCancelled() 258 FileReadThread::notifyCancelled()
228 { 259 {
229 // entered with m_mutex locked 260 // entered with m_mutex locked
230 261
231 while (!m_newlyCancelled.empty()) { 262 while (!m_newlyCancelled.empty()) {
263
232 int token = *m_newlyCancelled.begin(); 264 int token = *m_newlyCancelled.begin();
265
266 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
267
233 m_newlyCancelled.erase(token); 268 m_newlyCancelled.erase(token);
234 emit cancelled(token); 269 emit cancelled(token);
235 } 270 }
236 } 271 }
237 272