Mercurial > hg > svcore
comparison data/fileio/FileReadThread.cpp @ 408:115f60df1e4d
* Speed up spectrogram painting by releasing mutex in FFTDataServer
while calculating data prior to writing it, and by adding whole-column
value query methods to FFT objects
* Add paint cache to Thumbwheel -- repaints of this widget were slowing
down the whole spectrogram repaint
* More uses of MutexLocker (named and with debug) and more profile
points
* Make startup much quicker some of the time, with OSC server in place
author | Chris Cannam |
---|---|
date | Thu, 08 May 2008 14:46:22 +0000 |
parents | 1a42221a1522 |
children | 1f15beefcd76 |
comparison
equal
deleted
inserted
replaced
407:88ad01799040 | 408:115f60df1e4d |
---|---|
14 */ | 14 */ |
15 | 15 |
16 #include "FileReadThread.h" | 16 #include "FileReadThread.h" |
17 | 17 |
18 #include "base/Profiler.h" | 18 #include "base/Profiler.h" |
19 #include "base/Thread.h" | |
19 | 20 |
20 #include <iostream> | 21 #include <iostream> |
21 #include <unistd.h> | 22 #include <unistd.h> |
22 | 23 |
23 //#define DEBUG_FILE_READ_THREAD 1 | 24 //#define DEBUG_FILE_READ_THREAD 1 |
29 } | 30 } |
30 | 31 |
31 void | 32 void |
32 FileReadThread::run() | 33 FileReadThread::run() |
33 { | 34 { |
34 m_mutex.lock(); | 35 MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex"); |
35 | 36 |
36 while (!m_exiting) { | 37 while (!m_exiting) { |
37 if (m_queue.empty()) { | 38 if (m_queue.empty()) { |
38 m_condition.wait(&m_mutex, 1000); | 39 m_condition.wait(&m_mutex, 1000); |
39 } else { | 40 } else { |
41 } | 42 } |
42 notifyCancelled(); | 43 notifyCancelled(); |
43 } | 44 } |
44 | 45 |
45 notifyCancelled(); | 46 notifyCancelled(); |
46 m_mutex.unlock(); | |
47 | 47 |
48 #ifdef DEBUG_FILE_READ_THREAD | 48 #ifdef DEBUG_FILE_READ_THREAD |
49 std::cerr << "FileReadThread::run() exiting" << std::endl; | 49 std::cerr << "FileReadThread::run() exiting" << std::endl; |
50 #endif | 50 #endif |
51 } | 51 } |
55 { | 55 { |
56 #ifdef DEBUG_FILE_READ_THREAD | 56 #ifdef DEBUG_FILE_READ_THREAD |
57 std::cerr << "FileReadThread::finish()" << std::endl; | 57 std::cerr << "FileReadThread::finish()" << std::endl; |
58 #endif | 58 #endif |
59 | 59 |
60 m_mutex.lock(); | 60 { |
61 while (!m_queue.empty()) { | 61 MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex"); |
62 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; | 62 |
63 m_newlyCancelled.insert(m_queue.begin()->first); | 63 while (!m_queue.empty()) { |
64 m_queue.erase(m_queue.begin()); | 64 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second; |
65 } | 65 m_newlyCancelled.insert(m_queue.begin()->first); |
66 | 66 m_queue.erase(m_queue.begin()); |
67 m_exiting = true; | 67 } |
68 m_mutex.unlock(); | 68 |
69 m_exiting = true; | |
70 } | |
69 | 71 |
70 m_condition.wakeAll(); | 72 m_condition.wakeAll(); |
71 | 73 |
72 #ifdef DEBUG_FILE_READ_THREAD | 74 #ifdef DEBUG_FILE_READ_THREAD |
73 std::cerr << "FileReadThread::finish() exiting" << std::endl; | 75 std::cerr << "FileReadThread::finish() exiting" << std::endl; |
75 } | 77 } |
76 | 78 |
77 int | 79 int |
78 FileReadThread::request(const Request &request) | 80 FileReadThread::request(const Request &request) |
79 { | 81 { |
80 m_mutex.lock(); | 82 int token; |
81 | 83 |
82 int token = m_nextToken++; | 84 { |
83 m_queue[token] = request; | 85 MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex"); |
84 | 86 |
85 m_mutex.unlock(); | 87 token = m_nextToken++; |
88 m_queue[token] = request; | |
89 } | |
90 | |
86 m_condition.wakeAll(); | 91 m_condition.wakeAll(); |
87 | 92 |
88 return token; | 93 return token; |
89 } | 94 } |
90 | 95 |
91 void | 96 void |
92 FileReadThread::cancel(int token) | 97 FileReadThread::cancel(int token) |
93 { | 98 { |
94 m_mutex.lock(); | 99 { |
95 | 100 MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex"); |
96 if (m_queue.find(token) != m_queue.end()) { | 101 |
97 m_cancelledRequests[token] = m_queue[token]; | 102 if (m_queue.find(token) != m_queue.end()) { |
98 m_queue.erase(token); | 103 m_cancelledRequests[token] = m_queue[token]; |
99 m_newlyCancelled.insert(token); | 104 m_queue.erase(token); |
100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { | 105 m_newlyCancelled.insert(token); |
101 m_cancelledRequests[token] = m_readyRequests[token]; | 106 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { |
102 m_readyRequests.erase(token); | 107 m_cancelledRequests[token] = m_readyRequests[token]; |
103 } else { | 108 m_readyRequests.erase(token); |
104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; | 109 } else { |
105 } | 110 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl; |
106 | 111 } |
107 m_mutex.unlock(); | 112 } |
108 | 113 |
109 #ifdef DEBUG_FILE_READ_THREAD | 114 #ifdef DEBUG_FILE_READ_THREAD |
110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; | 115 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl; |
111 #endif | 116 #endif |
112 | 117 |
114 } | 119 } |
115 | 120 |
116 bool | 121 bool |
117 FileReadThread::isReady(int token) | 122 FileReadThread::isReady(int token) |
118 { | 123 { |
119 m_mutex.lock(); | 124 MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex"); |
120 | 125 |
121 bool ready = m_readyRequests.find(token) != m_readyRequests.end(); | 126 bool ready = m_readyRequests.find(token) != m_readyRequests.end(); |
122 | 127 |
123 m_mutex.unlock(); | |
124 return ready; | 128 return ready; |
125 } | 129 } |
126 | 130 |
127 bool | 131 bool |
128 FileReadThread::isCancelled(int token) | 132 FileReadThread::isCancelled(int token) |
129 { | 133 { |
130 m_mutex.lock(); | 134 MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex"); |
131 | 135 |
132 bool cancelled = | 136 bool cancelled = |
133 m_cancelledRequests.find(token) != m_cancelledRequests.end() && | 137 m_cancelledRequests.find(token) != m_cancelledRequests.end() && |
134 m_newlyCancelled.find(token) == m_newlyCancelled.end(); | 138 m_newlyCancelled.find(token) == m_newlyCancelled.end(); |
135 | 139 |
136 m_mutex.unlock(); | |
137 return cancelled; | 140 return cancelled; |
138 } | 141 } |
139 | 142 |
140 bool | 143 bool |
141 FileReadThread::getRequest(int token, Request &request) | 144 FileReadThread::getRequest(int token, Request &request) |
142 { | 145 { |
143 m_mutex.lock(); | 146 MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex"); |
144 | 147 |
145 bool found = false; | 148 bool found = false; |
146 | 149 |
147 if (m_queue.find(token) != m_queue.end()) { | 150 if (m_queue.find(token) != m_queue.end()) { |
148 request = m_queue[token]; | 151 request = m_queue[token]; |
153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { | 156 } else if (m_readyRequests.find(token) != m_readyRequests.end()) { |
154 request = m_readyRequests[token]; | 157 request = m_readyRequests[token]; |
155 found = true; | 158 found = true; |
156 } | 159 } |
157 | 160 |
158 m_mutex.unlock(); | |
159 | |
160 return found; | 161 return found; |
161 } | 162 } |
162 | 163 |
163 void | 164 void |
164 FileReadThread::done(int token) | 165 FileReadThread::done(int token) |
165 { | 166 { |
166 m_mutex.lock(); | 167 MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex"); |
167 | 168 |
168 bool found = false; | 169 bool found = false; |
169 | 170 |
170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { | 171 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) { |
171 m_cancelledRequests.erase(token); | 172 m_cancelledRequests.erase(token); |
176 found = true; | 177 found = true; |
177 } else if (m_queue.find(token) != m_queue.end()) { | 178 } else if (m_queue.find(token) != m_queue.end()) { |
178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; | 179 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl; |
179 } | 180 } |
180 | 181 |
181 m_mutex.unlock(); | |
182 | |
183 if (!found) { | 182 if (!found) { |
184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; | 183 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl; |
185 } | 184 } |
186 } | 185 } |
187 | 186 |
188 void | 187 void |
189 FileReadThread::process() | 188 FileReadThread::process() |
190 { | 189 { |
191 // entered with m_mutex locked and m_queue non-empty | 190 // entered with m_mutex locked and m_queue non-empty |
192 | 191 |
193 #ifdef DEBUG_FILE_READ_THREAD | 192 Profiler profiler("FileReadThread::process", true); |
194 Profiler profiler("FileReadThread::process()", true); | |
195 #endif | |
196 | 193 |
197 int token = m_queue.begin()->first; | 194 int token = m_queue.begin()->first; |
198 Request request = m_queue.begin()->second; | 195 Request request = m_queue.begin()->second; |
199 | 196 |
200 m_mutex.unlock(); | 197 m_mutex.unlock(); |
205 | 202 |
206 bool successful = false; | 203 bool successful = false; |
207 bool seekFailed = false; | 204 bool seekFailed = false; |
208 ssize_t r = 0; | 205 ssize_t r = 0; |
209 | 206 |
210 if (request.mutex) request.mutex->lock(); | 207 { |
211 | 208 MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex"); |
212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { | 209 |
213 seekFailed = true; | 210 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) { |
214 } else { | 211 seekFailed = true; |
215 | 212 } else { |
216 // if request.size is large, we want to avoid making a single | 213 |
217 // system call to read it all as it may block too much | 214 // if request.size is large, we want to avoid making a single |
218 | 215 // system call to read it all as it may block too much |
219 static const size_t blockSize = 256 * 1024; | 216 |
220 | 217 static const size_t blockSize = 256 * 1024; |
221 size_t size = request.size; | 218 |
222 char *destination = request.data; | 219 size_t size = request.size; |
223 | 220 char *destination = request.data; |
224 while (size > 0) { | 221 |
225 size_t readSize = size; | 222 while (size > 0) { |
226 if (readSize > blockSize) readSize = blockSize; | 223 size_t readSize = size; |
227 ssize_t br = ::read(request.fd, destination, readSize); | 224 if (readSize > blockSize) readSize = blockSize; |
228 if (br < 0) { | 225 ssize_t br = ::read(request.fd, destination, readSize); |
229 r = br; | 226 if (br < 0) { |
230 break; | 227 r = br; |
231 } else { | 228 break; |
232 r += br; | 229 } else { |
233 if (br < ssize_t(readSize)) break; | 230 r += br; |
231 if (br < ssize_t(readSize)) break; | |
232 } | |
233 destination += readSize; | |
234 size -= readSize; | |
234 } | 235 } |
235 destination += readSize; | 236 } |
236 size -= readSize; | 237 } |
237 } | |
238 } | |
239 | |
240 if (request.mutex) request.mutex->unlock(); | |
241 | 238 |
242 if (seekFailed) { | 239 if (seekFailed) { |
243 ::perror("Seek failed"); | 240 ::perror("Seek failed"); |
244 std::cerr << "ERROR: FileReadThread::process: seek to " | 241 std::cerr << "ERROR: FileReadThread::process: seek to " |
245 << request.start << " failed" << std::endl; | 242 << request.start << " failed" << std::endl; |