comparison data/fileio/FileReadThread.cpp @ 408:115f60df1e4d

* Speed up spectrogram painting by releasing mutex in FFTDataServer while calculating data prior to writing it, and by adding whole-column value query methods to FFT objects * Add paint cache to Thumbwheel -- repaints of this widget were slowing down the whole spectrogram repaint * More uses of MutexLocker (named and with debug) and more profile points * Make startup much quicker some of the time, with OSC server in place
author Chris Cannam
date Thu, 08 May 2008 14:46:22 +0000
parents 1a42221a1522
children 1f15beefcd76
comparison
equal deleted inserted replaced
407:88ad01799040 408:115f60df1e4d
14 */ 14 */
15 15
16 #include "FileReadThread.h" 16 #include "FileReadThread.h"
17 17
18 #include "base/Profiler.h" 18 #include "base/Profiler.h"
19 #include "base/Thread.h"
19 20
20 #include <iostream> 21 #include <iostream>
21 #include <unistd.h> 22 #include <unistd.h>
22 23
23 //#define DEBUG_FILE_READ_THREAD 1 24 //#define DEBUG_FILE_READ_THREAD 1
29 } 30 }
30 31
31 void 32 void
32 FileReadThread::run() 33 FileReadThread::run()
33 { 34 {
34 m_mutex.lock(); 35 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
35 36
36 while (!m_exiting) { 37 while (!m_exiting) {
37 if (m_queue.empty()) { 38 if (m_queue.empty()) {
38 m_condition.wait(&m_mutex, 1000); 39 m_condition.wait(&m_mutex, 1000);
39 } else { 40 } else {
41 } 42 }
42 notifyCancelled(); 43 notifyCancelled();
43 } 44 }
44 45
45 notifyCancelled(); 46 notifyCancelled();
46 m_mutex.unlock();
47 47
48 #ifdef DEBUG_FILE_READ_THREAD 48 #ifdef DEBUG_FILE_READ_THREAD
49 std::cerr << "FileReadThread::run() exiting" << std::endl; 49 std::cerr << "FileReadThread::run() exiting" << std::endl;
50 #endif 50 #endif
51 } 51 }
55 { 55 {
56 #ifdef DEBUG_FILE_READ_THREAD 56 #ifdef DEBUG_FILE_READ_THREAD
57 std::cerr << "FileReadThread::finish()" << std::endl; 57 std::cerr << "FileReadThread::finish()" << std::endl;
58 #endif 58 #endif
59 59
60 m_mutex.lock(); 60 {
61 while (!m_queue.empty()) { 61 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; 62
63 m_newlyCancelled.insert(m_queue.begin()->first); 63 while (!m_queue.empty()) {
64 m_queue.erase(m_queue.begin()); 64 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
65 } 65 m_newlyCancelled.insert(m_queue.begin()->first);
66 66 m_queue.erase(m_queue.begin());
67 m_exiting = true; 67 }
68 m_mutex.unlock(); 68
69 m_exiting = true;
70 }
69 71
70 m_condition.wakeAll(); 72 m_condition.wakeAll();
71 73
72 #ifdef DEBUG_FILE_READ_THREAD 74 #ifdef DEBUG_FILE_READ_THREAD
73 std::cerr << "FileReadThread::finish() exiting" << std::endl; 75 std::cerr << "FileReadThread::finish() exiting" << std::endl;
75 } 77 }
76 78
77 int 79 int
78 FileReadThread::request(const Request &request) 80 FileReadThread::request(const Request &request)
79 { 81 {
80 m_mutex.lock(); 82 int token;
81 83
82 int token = m_nextToken++; 84 {
83 m_queue[token] = request; 85 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
84 86
85 m_mutex.unlock(); 87 token = m_nextToken++;
88 m_queue[token] = request;
89 }
90
86 m_condition.wakeAll(); 91 m_condition.wakeAll();
87 92
88 return token; 93 return token;
89 } 94 }
90 95
91 void 96 void
92 FileReadThread::cancel(int token) 97 FileReadThread::cancel(int token)
93 { 98 {
94 m_mutex.lock(); 99 {
95 100 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
96 if (m_queue.find(token) != m_queue.end()) { 101
97 m_cancelledRequests[token] = m_queue[token]; 102 if (m_queue.find(token) != m_queue.end()) {
98 m_queue.erase(token); 103 m_cancelledRequests[token] = m_queue[token];
99 m_newlyCancelled.insert(token); 104 m_queue.erase(token);
100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { 105 m_newlyCancelled.insert(token);
101 m_cancelledRequests[token] = m_readyRequests[token]; 106 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
102 m_readyRequests.erase(token); 107 m_cancelledRequests[token] = m_readyRequests[token];
103 } else { 108 m_readyRequests.erase(token);
104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; 109 } else {
105 } 110 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
106 111 }
107 m_mutex.unlock(); 112 }
108 113
109 #ifdef DEBUG_FILE_READ_THREAD 114 #ifdef DEBUG_FILE_READ_THREAD
110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; 115 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
111 #endif 116 #endif
112 117
114 } 119 }
115 120
116 bool 121 bool
117 FileReadThread::isReady(int token) 122 FileReadThread::isReady(int token)
118 { 123 {
119 m_mutex.lock(); 124 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
120 125
121 bool ready = m_readyRequests.find(token) != m_readyRequests.end(); 126 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
122 127
123 m_mutex.unlock();
124 return ready; 128 return ready;
125 } 129 }
126 130
127 bool 131 bool
128 FileReadThread::isCancelled(int token) 132 FileReadThread::isCancelled(int token)
129 { 133 {
130 m_mutex.lock(); 134 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
131 135
132 bool cancelled = 136 bool cancelled =
133 m_cancelledRequests.find(token) != m_cancelledRequests.end() && 137 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
134 m_newlyCancelled.find(token) == m_newlyCancelled.end(); 138 m_newlyCancelled.find(token) == m_newlyCancelled.end();
135 139
136 m_mutex.unlock();
137 return cancelled; 140 return cancelled;
138 } 141 }
139 142
140 bool 143 bool
141 FileReadThread::getRequest(int token, Request &request) 144 FileReadThread::getRequest(int token, Request &request)
142 { 145 {
143 m_mutex.lock(); 146 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
144 147
145 bool found = false; 148 bool found = false;
146 149
147 if (m_queue.find(token) != m_queue.end()) { 150 if (m_queue.find(token) != m_queue.end()) {
148 request = m_queue[token]; 151 request = m_queue[token];
153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { 156 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
154 request = m_readyRequests[token]; 157 request = m_readyRequests[token];
155 found = true; 158 found = true;
156 } 159 }
157 160
158 m_mutex.unlock();
159
160 return found; 161 return found;
161 } 162 }
162 163
163 void 164 void
164 FileReadThread::done(int token) 165 FileReadThread::done(int token)
165 { 166 {
166 m_mutex.lock(); 167 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
167 168
168 bool found = false; 169 bool found = false;
169 170
170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { 171 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
171 m_cancelledRequests.erase(token); 172 m_cancelledRequests.erase(token);
176 found = true; 177 found = true;
177 } else if (m_queue.find(token) != m_queue.end()) { 178 } else if (m_queue.find(token) != m_queue.end()) {
178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; 179 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
179 } 180 }
180 181
181 m_mutex.unlock();
182
183 if (!found) { 182 if (!found) {
184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; 183 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
185 } 184 }
186 } 185 }
187 186
188 void 187 void
189 FileReadThread::process() 188 FileReadThread::process()
190 { 189 {
191 // entered with m_mutex locked and m_queue non-empty 190 // entered with m_mutex locked and m_queue non-empty
192 191
193 #ifdef DEBUG_FILE_READ_THREAD 192 Profiler profiler("FileReadThread::process", true);
194 Profiler profiler("FileReadThread::process()", true);
195 #endif
196 193
197 int token = m_queue.begin()->first; 194 int token = m_queue.begin()->first;
198 Request request = m_queue.begin()->second; 195 Request request = m_queue.begin()->second;
199 196
200 m_mutex.unlock(); 197 m_mutex.unlock();
205 202
206 bool successful = false; 203 bool successful = false;
207 bool seekFailed = false; 204 bool seekFailed = false;
208 ssize_t r = 0; 205 ssize_t r = 0;
209 206
210 if (request.mutex) request.mutex->lock(); 207 {
211 208 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { 209
213 seekFailed = true; 210 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
214 } else { 211 seekFailed = true;
215 212 } else {
216 // if request.size is large, we want to avoid making a single 213
217 // system call to read it all as it may block too much 214 // if request.size is large, we want to avoid making a single
218 215 // system call to read it all as it may block too much
219 static const size_t blockSize = 256 * 1024; 216
220 217 static const size_t blockSize = 256 * 1024;
221 size_t size = request.size; 218
222 char *destination = request.data; 219 size_t size = request.size;
223 220 char *destination = request.data;
224 while (size > 0) { 221
225 size_t readSize = size; 222 while (size > 0) {
226 if (readSize > blockSize) readSize = blockSize; 223 size_t readSize = size;
227 ssize_t br = ::read(request.fd, destination, readSize); 224 if (readSize > blockSize) readSize = blockSize;
228 if (br < 0) { 225 ssize_t br = ::read(request.fd, destination, readSize);
229 r = br; 226 if (br < 0) {
230 break; 227 r = br;
231 } else { 228 break;
232 r += br; 229 } else {
233 if (br < ssize_t(readSize)) break; 230 r += br;
231 if (br < ssize_t(readSize)) break;
232 }
233 destination += readSize;
234 size -= readSize;
234 } 235 }
235 destination += readSize; 236 }
236 size -= readSize; 237 }
237 }
238 }
239
240 if (request.mutex) request.mutex->unlock();
241 238
242 if (seekFailed) { 239 if (seekFailed) {
243 ::perror("Seek failed"); 240 ::perror("Seek failed");
244 std::cerr << "ERROR: FileReadThread::process: seek to " 241 std::cerr << "ERROR: FileReadThread::process: seek to "
245 << request.start << " failed" << std::endl; 242 << request.start << " failed" << std::endl;