diff base/FileReadThread.cpp @ 96:1aebdc68ec6d

* Introduce simple non-RT thread base class * Rename MatrixFileCache to MatrixFile * some fixes & tidying
author Chris Cannam
date Thu, 04 May 2006 16:03:02 +0000
parents 040a151d0897
children 2b1a16e38d2d
line wrap: on
line diff
--- a/base/FileReadThread.cpp	Thu May 04 13:59:57 2006 +0000
+++ b/base/FileReadThread.cpp	Thu May 04 16:03:02 2006 +0000
@@ -33,8 +33,8 @@
             m_condition.wait(&m_mutex, 1000);
         } else {
             process();
-            notifyCancelled();
         }
+        notifyCancelled();
     }
 
     notifyCancelled();
@@ -86,9 +86,18 @@
         m_cancelledRequests[token] = m_queue[token];
         m_queue.erase(token);
         m_newlyCancelled.insert(token);
+    } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
+        m_cancelledRequests[token] = m_readyRequests[token];
+        m_readyRequests.erase(token);
+    } else {
+        std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
     }
 
     m_mutex.unlock();
+
+    std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
+
+    m_condition.wakeAll();
 }
 
 bool
@@ -184,7 +193,29 @@
     if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
         seekFailed = true;
     } else {
-        r = ::read(request.fd, request.data, request.size);
+        
+        // if request.size is large, we want to avoid making a single
+        // system call to read it all as it may block too much
+
+        static const size_t blockSize = 16384;
+        
+        size_t size = request.size;
+        char *destination = request.data;
+
+        while (size > 0) {
+            size_t readSize = size;
+            if (readSize > blockSize) readSize = blockSize;
+            ssize_t br = ::read(request.fd, destination, readSize);
+            if (br < 0) { 
+                r = br;
+                break;
+            } else {
+                r += br;
+                if (br < ssize_t(readSize)) break;
+            }
+            destination += readSize;
+            size -= readSize;
+        }
     }
 
     if (request.mutex) request.mutex->unlock();
@@ -229,7 +260,11 @@
     // entered with m_mutex locked
 
     while (!m_newlyCancelled.empty()) {
+
         int token = *m_newlyCancelled.begin();
+
+        std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
+
         m_newlyCancelled.erase(token);
         emit cancelled(token);
     }