comparison data/fileio/MatrixFile.cpp @ 537:3cc4b7cd2aa5

* Merge from one-fftdataserver-per-fftmodel branch. This bit of reworking (which is not described very accurately by the title of the branch) turns the MatrixFile object into something that either reads or writes, but not both, and separates the FFT file cache reader and writer implementations separately. This allows the FFT data server to have a single thread owning writers and one reader per "customer" thread, and for all locking to be vastly simplified and concentrated in the data server alone (because none of the classes it makes use of is used in more than one thread at a time). The result is faster and more trustworthy code.
author Chris Cannam
date Tue, 27 Jan 2009 13:25:10 +0000
parents 3e0f1f7bec85
children 95391b480e17
comparison
equal deleted inserted replaced
536:beb51f558e9c 537:3cc4b7cd2aa5
2 2
3 /* 3 /*
4 Sonic Visualiser 4 Sonic Visualiser
5 An audio file viewer and annotation editor. 5 An audio file viewer and annotation editor.
6 Centre for Digital Music, Queen Mary, University of London. 6 Centre for Digital Music, Queen Mary, University of London.
7 This file copyright 2006 Chris Cannam. 7 This file copyright 2006-2009 Chris Cannam and QMUL.
8 8
9 This program is free software; you can redistribute it and/or 9 This program is free software; you can redistribute it and/or
10 modify it under the terms of the GNU General Public License as 10 modify it under the terms of the GNU General Public License as
11 published by the Free Software Foundation; either version 2 of the 11 published by the Free Software Foundation; either version 2 of the
12 License, or (at your option) any later version. See the file 12 License, or (at your option) any later version. See the file
33 #include <cstdlib> 33 #include <cstdlib>
34 34
35 #include <QFileInfo> 35 #include <QFileInfo>
36 #include <QDir> 36 #include <QDir>
37 37
38 //#define DEBUG_MATRIX_FILE 1 38 #define DEBUG_MATRIX_FILE 1
39 //#define DEBUG_MATRIX_FILE_READ_SET 1 39 //#define DEBUG_MATRIX_FILE_READ_SET 1
40 40
41 #ifdef DEBUG_MATRIX_FILE_READ_SET 41 #ifdef DEBUG_MATRIX_FILE_READ_SET
42 #ifndef DEBUG_MATRIX_FILE 42 #ifndef DEBUG_MATRIX_FILE
43 #define DEBUG_MATRIX_FILE 1 43 #define DEBUG_MATRIX_FILE 1
44 #endif 44 #endif
45 #endif 45 #endif
46 46
47 std::map<QString, int> MatrixFile::m_refcount; 47 std::map<QString, int> MatrixFile::m_refcount;
48 QMutex MatrixFile::m_refcountMutex; 48 QMutex MatrixFile::m_createMutex;
49
50 MatrixFile::ResizeableBitsetMap MatrixFile::m_columnBitsets;
51 QMutex MatrixFile::m_columnBitsetWriteMutex;
52
53 FileReadThread *MatrixFile::m_readThread = 0;
54 49
55 static size_t totalStorage = 0; 50 static size_t totalStorage = 0;
56 static size_t totalMemory = 0;
57 static size_t totalCount = 0; 51 static size_t totalCount = 0;
52 static size_t openCount = 0;
58 53
59 MatrixFile::MatrixFile(QString fileBase, Mode mode, 54 MatrixFile::MatrixFile(QString fileBase, Mode mode,
60 size_t cellSize, bool eagerCache) : 55 size_t cellSize, size_t width, size_t height) :
61 m_fd(-1), 56 m_fd(-1),
62 m_mode(mode), 57 m_mode(mode),
63 m_flags(0), 58 m_flags(0),
64 m_fmode(0), 59 m_fmode(0),
65 m_cellSize(cellSize), 60 m_cellSize(cellSize),
66 m_width(0), 61 m_width(width),
67 m_height(0), 62 m_height(height),
68 m_headerSize(2 * sizeof(size_t)), 63 m_headerSize(2 * sizeof(size_t))
69 m_defaultCacheWidth(1024),
70 m_prevX(0),
71 m_eagerCache(eagerCache),
72 m_requestToken(-1),
73 m_spareData(0),
74 m_columnBitset(0)
75 { 64 {
76 Profiler profiler("MatrixFile::MatrixFile", true); 65 Profiler profiler("MatrixFile::MatrixFile", true);
77 66
78 if (!m_readThread) { 67 #ifdef DEBUG_MATRIX_FILE
79 m_readThread = new FileReadThread; 68 std::cerr << "MatrixFile::MatrixFile(" << fileBase.toStdString() << ", " << int(mode) << ", " << cellSize << ", " << width << ", " << height << ")" << std::endl;
80 m_readThread->start(); 69 #endif
81 } 70
82 71 QMutexLocker locker(&m_createMutex);
83 m_cache.data = 0;
84 72
85 QDir tempDir(TempDirectory::getInstance()->getPath()); 73 QDir tempDir(TempDirectory::getInstance()->getPath());
86 QString fileName(tempDir.filePath(QString("%1.mfc").arg(fileBase))); 74 QString fileName(tempDir.filePath(QString("%1.mfc").arg(fileBase)));
87 bool newFile = !QFileInfo(fileName).exists(); 75 bool newFile = !QFileInfo(fileName).exists();
88 76
90 std::cerr << "ERROR: MatrixFile::MatrixFile: Read-only mode " 78 std::cerr << "ERROR: MatrixFile::MatrixFile: Read-only mode "
91 << "specified, but cache file does not exist" << std::endl; 79 << "specified, but cache file does not exist" << std::endl;
92 throw FileNotFound(fileName); 80 throw FileNotFound(fileName);
93 } 81 }
94 82
95 if (!newFile && m_mode == ReadWrite) { 83 if (!newFile && m_mode == WriteOnly) {
96 std::cerr << "Note: MatrixFile::MatrixFile: Read/write mode " 84 std::cerr << "ERROR: MatrixFile::MatrixFile: Write-only mode "
97 << "specified, but file already exists; falling back to " 85 << "specified, but file already exists" << std::endl;
98 << "read-only mode" << std::endl; 86 throw FileOperationFailed(fileName, "create");
99 m_mode = ReadOnly;
100 }
101
102 if (!eagerCache && m_mode == ReadOnly) {
103 std::cerr << "WARNING: MatrixFile::MatrixFile: Eager cacheing not "
104 << "specified, but file is open in read-only mode -- cache "
105 << "will not be used" << std::endl;
106 } 87 }
107 88
108 m_flags = 0; 89 m_flags = 0;
109 m_fmode = S_IRUSR | S_IWUSR; 90 m_fmode = S_IRUSR | S_IWUSR;
110 91
111 if (m_mode == ReadWrite) { 92 if (m_mode == WriteOnly) {
112 m_flags = O_RDWR | O_CREAT; 93 m_flags = O_WRONLY | O_CREAT;
113 } else { 94 } else {
114 m_flags = O_RDONLY; 95 m_flags = O_RDONLY;
115 } 96 }
116 97
117 #ifdef _WIN32 98 #ifdef _WIN32
118 m_flags |= O_BINARY; 99 m_flags |= O_BINARY;
119 #endif 100 #endif
120 101
121 #ifdef DEBUG_MATRIX_FILE 102 #ifdef DEBUG_MATRIX_FILE
122 std::cerr << "MatrixFile::MatrixFile: opening " << fileName.toStdString() << "..." << std::endl; 103 std::cerr << "MatrixFile(" << this << ")::MatrixFile: opening " << fileName.toStdString() << "..." << std::endl;
123 #endif 104 #endif
124 105
125 if ((m_fd = ::open(fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) { 106 if ((m_fd = ::open(fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
126 ::perror("Open failed"); 107 ::perror("Open failed");
127 std::cerr << "ERROR: MatrixFile::MatrixFile: " 108 std::cerr << "ERROR: MatrixFile::MatrixFile: "
128 << "Failed to open cache file \"" 109 << "Failed to open cache file \""
129 << fileName.toStdString() << "\""; 110 << fileName.toStdString() << "\"";
130 if (m_mode == ReadWrite) std::cerr << " for writing"; 111 if (m_mode == WriteOnly) std::cerr << " for writing";
131 std::cerr << std::endl; 112 std::cerr << std::endl;
132 throw FailedToOpenFile(fileName); 113 throw FailedToOpenFile(fileName);
133 } 114 }
134 115
116 #ifdef DEBUG_MATRIX_FILE
117 std::cerr << "MatrixFile(" << this << ")::MatrixFile: fd is " << m_fd << std::endl;
118 #endif
119
135 if (newFile) { 120 if (newFile) {
136 resize(0, 0); // write header 121 initialise(); // write header and "unwritten" column tags
137 } else { 122 } else {
138 size_t header[2]; 123 size_t header[2];
139 if (::read(m_fd, header, 2 * sizeof(size_t)) < 0) { 124 if (::read(m_fd, header, 2 * sizeof(size_t)) < 0) {
140 ::perror("MatrixFile::MatrixFile: read failed"); 125 ::perror("MatrixFile::MatrixFile: read failed");
141 std::cerr << "ERROR: MatrixFile::MatrixFile: " 126 std::cerr << "ERROR: MatrixFile::MatrixFile: "
142 << "Failed to read header (fd " << m_fd << ", file \"" 127 << "Failed to read header (fd " << m_fd << ", file \""
143 << fileName.toStdString() << "\")" << std::endl; 128 << fileName.toStdString() << "\")" << std::endl;
144 throw FileReadFailed(fileName); 129 throw FileReadFailed(fileName);
145 } 130 }
146 m_width = header[0]; 131 if (header[0] != m_width || header[1] != m_height) {
147 m_height = header[1]; 132 std::cerr << "ERROR: MatrixFile::MatrixFile: "
148 seekTo(0, 0); 133 << "Dimensions in file header (" << header[0] << "x"
134 << header[1] << ") differ from expected dimensions "
135 << m_width << "x" << m_height << std::endl;
136 throw FailedToOpenFile(fileName);
137 }
149 } 138 }
150 139
151 m_fileName = fileName; 140 m_fileName = fileName;
152
153 {
154 MutexLocker locker
155 (&m_columnBitsetWriteMutex,
156 "MatrixFile::MatrixFile::m_columnBitsetWriteMutex");
157
158 if (m_columnBitsets.find(m_fileName) == m_columnBitsets.end()) {
159 m_columnBitsets[m_fileName] = new ResizeableBitset;
160 }
161 m_columnBitset = m_columnBitsets[m_fileName];
162 }
163
164 MutexLocker locker(&m_refcountMutex,
165 "MatrixFile::MatrixFile::m_refcountMutex");
166 ++m_refcount[fileName]; 141 ++m_refcount[fileName];
167 142
168 // std::cerr << "MatrixFile(" << this << "): fd " << m_fd << ", file " << fileName.toStdString() << ", ref " << m_refcount[fileName] << std::endl; 143 #ifdef DEBUG_MATRIX_FILE
169 144 std::cerr << "MatrixFile[" << m_fd << "]::MatrixFile: File " << fileName.toStdString() << ", ref " << m_refcount[fileName] << std::endl;
170 // std::cerr << "MatrixFile::MatrixFile: Done, size is " << "(" << m_width << ", " << m_height << ")" << std::endl; 145
146 std::cerr << "MatrixFile[" << m_fd << "]::MatrixFile: Done, size is " << "(" << m_width << ", " << m_height << ")" << std::endl;
147 #endif
171 148
172 ++totalCount; 149 ++totalCount;
173 150 ++openCount;
174 } 151 }
175 152
176 MatrixFile::~MatrixFile() 153 MatrixFile::~MatrixFile()
177 { 154 {
178 char *requestData = 0;
179
180 if (m_requestToken >= 0) {
181 FileReadThread::Request request;
182 if (m_readThread->getRequest(m_requestToken, request)) {
183 requestData = request.data;
184 }
185 m_readThread->cancel(m_requestToken);
186 }
187
188 if (requestData) free(requestData);
189 if (m_cache.data) free(m_cache.data);
190 if (m_spareData) free(m_spareData);
191
192 if (m_fd >= 0) { 155 if (m_fd >= 0) {
193 if (::close(m_fd) < 0) { 156 if (::close(m_fd) < 0) {
194 ::perror("MatrixFile::~MatrixFile: close failed"); 157 ::perror("MatrixFile::~MatrixFile: close failed");
195 } 158 }
196 } 159 }
197 160
161 QMutexLocker locker(&m_createMutex);
162
198 if (m_fileName != "") { 163 if (m_fileName != "") {
199 164
200 MutexLocker locker(&m_refcountMutex,
201 "MatrixFile::~MatrixFile::m_refcountMutex");
202
203 if (--m_refcount[m_fileName] == 0) { 165 if (--m_refcount[m_fileName] == 0) {
204 166
205 if (::unlink(m_fileName.toLocal8Bit())) { 167 if (::unlink(m_fileName.toLocal8Bit())) {
206 // ::perror("Unlink failed"); 168 std::cerr << "WARNING: MatrixFile::~MatrixFile: reference count reached 0, but failed to unlink file \"" << m_fileName.toStdString() << "\"" << std::endl;
207 // std::cerr << "WARNING: MatrixFile::~MatrixFile: reference count reached 0, but failed to unlink file \"" << m_fileName.toStdString() << "\"" << std::endl;
208 } else { 169 } else {
209 // std::cerr << "deleted " << m_fileName.toStdString() << std::endl; 170 std::cerr << "deleted " << m_fileName.toStdString() << std::endl;
210 } 171 }
211 172 }
212 MutexLocker locker2 173 }
213 (&m_columnBitsetWriteMutex, 174
214 "MatrixFile::~MatrixFile::m_columnBitsetWriteMutex"); 175 if (m_mode == WriteOnly) {
215 m_columnBitsets.erase(m_fileName); 176 totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize) + m_width);
216 delete m_columnBitset; 177 }
217 } 178 totalCount --;
218 } 179 openCount --;
180
181 #ifdef DEBUG_MATRIX_FILE
182 std::cerr << "MatrixFile[" << m_fd << "]::~MatrixFile: " << std::endl;
183 std::cerr << "MatrixFile: Total storage now " << totalStorage/1024 << "K in " << totalCount << " instances (" << openCount << " open)" << std::endl;
184 #endif
185 }
186
187 void
188 MatrixFile::initialise()
189 {
190 Profiler profiler("MatrixFile::initialise", true);
191
192 assert(m_mode == WriteOnly);
219 193
220 totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize)); 194 off_t off = m_headerSize + (m_width * m_height * m_cellSize) + m_width;
221 totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize); 195
222 totalCount --; 196 #ifdef DEBUG_MATRIX_FILE
223 197 std::cerr << "MatrixFile[" << m_fd << "]::initialise(" << m_width << ", " << m_height << "): cell size " << m_cellSize << ", header size " << m_headerSize << ", resizing file" << std::endl;
224 // std::cerr << "MatrixFile::~MatrixFile: " << std::endl; 198 #endif
225 // std::cerr << "Total storage now " << totalStorage/1024 << "K, theoretical max memory " 199
226 // << totalMemory/1024 << "K in " << totalCount << " instances" << std::endl; 200 if (::lseek(m_fd, off - 1, SEEK_SET) < 0) {
227 201 ::perror("ERROR: MatrixFile::initialise: seek to end failed");
228 } 202 throw FileOperationFailed(m_fileName, "lseek");
229 203 }
230 void 204
231 MatrixFile::resize(size_t w, size_t h) 205 unsigned char byte = 0;
232 { 206 if (::write(m_fd, &byte, 1) != 1) {
233 Profiler profiler("MatrixFile::resize", true); 207 ::perror("ERROR: MatrixFile::initialise: write at end failed");
234 208 throw FileOperationFailed(m_fileName, "write");
235 assert(m_mode == ReadWrite); 209 }
236 210
237 MutexLocker locker(&m_fdMutex, "MatrixFile::resize::m_fdMutex"); 211 if (::lseek(m_fd, 0, SEEK_SET) < 0) {
238
239 totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize));
240 totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize);
241
242 off_t off = m_headerSize + (w * h * m_cellSize);
243
244 #ifdef DEBUG_MATRIX_FILE
245 std::cerr << "MatrixFile::resize(" << w << ", " << h << "): resizing file" << std::endl;
246 #endif
247
248 if (w * h < m_width * m_height) {
249 if (::ftruncate(m_fd, off) < 0) {
250 ::perror("WARNING: MatrixFile::resize: ftruncate failed");
251 throw FileOperationFailed(m_fileName, "ftruncate");
252 }
253 }
254
255 m_width = 0;
256 m_height = 0;
257
258 if (::lseek(m_fd, 0, SEEK_SET) == (off_t)-1) {
259 ::perror("ERROR: MatrixFile::resize: Seek to write header failed"); 212 ::perror("ERROR: MatrixFile::resize: Seek to write header failed");
260 throw FileOperationFailed(m_fileName, "lseek"); 213 throw FileOperationFailed(m_fileName, "lseek");
261 } 214 }
262 215
263 size_t header[2]; 216 size_t header[2];
264 header[0] = w; 217 header[0] = m_width;
265 header[1] = h; 218 header[1] = m_height;
266 if (::write(m_fd, header, 2 * sizeof(size_t)) != 2 * sizeof(size_t)) { 219 if (::write(m_fd, header, 2 * sizeof(size_t)) != 2 * sizeof(size_t)) {
267 ::perror("ERROR: MatrixFile::resize: Failed to write header"); 220 ::perror("ERROR: MatrixFile::resize: Failed to write header");
268 throw FileOperationFailed(m_fileName, "write"); 221 throw FileOperationFailed(m_fileName, "write");
269 } 222 }
270 223
271 if (w > 0 && m_defaultCacheWidth > w) { 224 if (m_mode == WriteOnly) {
272 m_defaultCacheWidth = w; 225 totalStorage += (m_headerSize + (m_width * m_height * m_cellSize) + m_width);
273 } 226 }
274 227
275 //!!! static size_t maxCacheMB = 16; 228 #ifdef DEBUG_MATRIX_FILE
276 static size_t maxCacheMB = 4; 229 std::cerr << "MatrixFile[" << m_fd << "]::resize(" << m_width << ", " << m_height << "): storage "
277 if (2 * m_defaultCacheWidth * h * m_cellSize > maxCacheMB * 1024 * 1024) { //!!! 230 << (m_headerSize + m_width * m_height * m_cellSize + m_width) << std::endl;
278 m_defaultCacheWidth = (maxCacheMB * 1024 * 1024) / (2 * h * m_cellSize); 231
279 if (m_defaultCacheWidth < 16) m_defaultCacheWidth = 16; 232 std::cerr << "MatrixFile: Total storage " << totalStorage/1024 << "K" << std::endl;
280 }
281
282 if (m_columnBitset) {
283 MutexLocker locker(&m_columnBitsetWriteMutex,
284 "MatrixFile::resize::m_columnBitsetWriteMutex");
285 m_columnBitset->resize(w);
286 }
287
288 if (m_cache.data) {
289 free(m_cache.data);
290 m_cache.data = 0;
291 }
292
293 if (m_spareData) {
294 free(m_spareData);
295 m_spareData = 0;
296 }
297
298 m_width = w;
299 m_height = h;
300
301 totalStorage += (m_headerSize + (m_width * m_height * m_cellSize));
302 totalMemory += (2 * m_defaultCacheWidth * m_height * m_cellSize);
303
304 #ifdef DEBUG_MATRIX_FILE
305 std::cerr << "MatrixFile::resize(" << w << ", " << h << "): cache width "
306 << m_defaultCacheWidth << ", storage "
307 << (m_headerSize + w * h * m_cellSize) << ", mem "
308 << (2 * h * m_defaultCacheWidth * m_cellSize) << std::endl;
309
310 std::cerr << "Total storage " << totalStorage/1024 << "K, theoretical max memory "
311 << totalMemory/1024 << "K in " << totalCount << " instances" << std::endl;
312 #endif 233 #endif
313 234
314 seekTo(0, 0); 235 seekTo(0, 0);
315 } 236 }
316 237
317 void 238 void
318 MatrixFile::reset() 239 MatrixFile::close()
319 { 240 {
320 Profiler profiler("MatrixFile::reset", true); 241 #ifdef DEBUG_MATRIX_FILE
321 242 std::cerr << "MatrixFile::close()" << std::endl;
322 assert (m_mode == ReadWrite); 243 #endif
323 244 if (m_fd >= 0) {
324 if (m_eagerCache) { 245 if (::close(m_fd) < 0) {
325 void *emptyCol = calloc(m_height, m_cellSize); 246 ::perror("MatrixFile::close: close failed");
326 for (size_t x = 0; x < m_width; ++x) setColumnAt(x, emptyCol); 247 }
327 free(emptyCol); 248 m_fd = -1;
328 } 249 -- openCount;
329
330 if (m_columnBitset) {
331 MutexLocker locker(&m_columnBitsetWriteMutex,
332 "MatrixFile::reset::m_columnBitsetWriteMutex");
333 m_columnBitset->resize(m_width);
334 } 250 }
335 } 251 }
336 252
337 void 253 void
338 MatrixFile::getColumnAt(size_t x, void *data) 254 MatrixFile::getColumnAt(size_t x, void *data)
339 { 255 {
256 assert(m_mode == ReadOnly);
257
258 #ifdef DEBUG_MATRIX_FILE_READ_SET
259 std::cerr << "MatrixFile[" << m_fd << "]::getColumnAt(" << x << ")" << std::endl;
260 #endif
261
340 Profiler profiler("MatrixFile::getColumnAt"); 262 Profiler profiler("MatrixFile::getColumnAt");
341 263
342 // assert(haveSetColumnAt(x)); 264 unsigned char set = 0;
343 265 if (!seekTo(x, 0)) {
344 if (getFromCache(x, 0, m_height, data)) return; 266 std::cerr << "ERROR: MatrixFile::getColumnAt(" << x << "): Seek failed" << std::endl;
345 267 throw FileOperationFailed(m_fileName, "seek");
346 Profiler profiler2("MatrixFile::getColumnAt (uncached)"); 268 }
347 269
348 ssize_t r = 0; 270 ssize_t r = -1;
349 271 r = ::read(m_fd, &set, 1);
350 #ifdef DEBUG_MATRIX_FILE
351 std::cerr << "MatrixFile::getColumnAt(" << x << ")"
352 << ": reading the slow way";
353
354 if (m_requestToken >= 0 &&
355 x >= m_requestingX &&
356 x < m_requestingX + m_requestingWidth) {
357
358 std::cerr << " (awaiting " << m_requestingX << ", " << m_requestingWidth << " from disk)";
359 }
360
361 std::cerr << std::endl;
362 #endif
363
364 {
365 MutexLocker locker(&m_fdMutex, "MatrixFile::getColumnAt::m_fdMutex");
366
367 if (seekTo(x, 0)) {
368 r = ::read(m_fd, data, m_height * m_cellSize);
369 }
370 }
371
372 if (r < 0) { 272 if (r < 0) {
373 ::perror("MatrixFile::getColumnAt: read failed"); 273 ::perror("MatrixFile::getColumnAt: read failed");
374 std::cerr << "ERROR: MatrixFile::getColumnAt: "
375 << "Failed to read column " << x << " (height " << m_height << ", cell size " << m_cellSize << ", fd " << m_fd << ", file \""
376 << m_fileName.toStdString() << "\")" << std::endl;
377 throw FileReadFailed(m_fileName); 274 throw FileReadFailed(m_fileName);
378 } 275 }
379 276 if (!set) {
380 return; 277 std::cerr << "MatrixFile[" << m_fd << "]::getColumnAt(" << x << "): Column has not been set" << std::endl;
278 return;
279 }
280
281 r = ::read(m_fd, data, m_height * m_cellSize);
282 if (r < 0) {
283 ::perror("MatrixFile::getColumnAt: read failed");
284 throw FileReadFailed(m_fileName);
285 }
381 } 286 }
382 287
383 bool 288 bool
384 MatrixFile::getFromCache(size_t x, size_t ystart, size_t ycount, void *data) 289 MatrixFile::haveSetColumnAt(size_t x) const
385 { 290 {
386 bool fail = false; 291 assert(m_mode == ReadOnly);
387 bool primeLeft = false; 292
388 293 Profiler profiler("MatrixFile::haveSetColumnAt");
389 { 294
390 MutexLocker locker(&m_cacheMutex, 295 #ifdef DEBUG_MATRIX_FILE_READ_SET
391 "MatrixFile::getFromCache::m_cacheMutex"); 296 std::cerr << "MatrixFile[" << m_fd << "]::haveSetColumnAt(" << x << ")" << std::endl;
392 297 // std::cerr << ".";
393 if (!m_cache.data || x < m_cache.x || x >= m_cache.x + m_cache.width) { 298 #endif
394 fail = true; 299
395 primeLeft = (m_cache.data && x < m_cache.x); 300 unsigned char set = 0;
396 } else { 301 if (!seekTo(x, 0)) {
397 memcpy(data, 302 std::cerr << "ERROR: MatrixFile::haveSetColumnAt(" << x << "): Seek failed" << std::endl;
398 m_cache.data + m_cellSize * ((x - m_cache.x) * m_height + ystart), 303 throw FileOperationFailed(m_fileName, "seek");
399 ycount * m_cellSize); 304 }
400 } 305
401 } 306 ssize_t r = -1;
402 307 r = ::read(m_fd, &set, 1);
403 if (fail) { 308 if (r < 0) {
404 primeCache(x, primeLeft); // this doesn't take effect until a later callback 309 ::perror("MatrixFile::haveSetColumnAt: read failed");
405 m_prevX = x; 310 throw FileReadFailed(m_fileName);
406 return false; 311 }
407 } 312
408 313 return set;
409 if (m_cache.x > 0 && x < m_prevX && x < m_cache.x + m_cache.width/4) {
410 primeCache(x, true);
411 }
412
413 if (m_cache.x + m_cache.width < m_width &&
414 x > m_prevX &&
415 x > m_cache.x + (m_cache.width * 3) / 4) {
416 primeCache(x, false);
417 }
418
419 m_prevX = x;
420 return true;
421 } 314 }
422 315
423 void 316 void
424 MatrixFile::setColumnAt(size_t x, const void *data) 317 MatrixFile::setColumnAt(size_t x, const void *data)
425 { 318 {
426 assert(m_mode == ReadWrite); 319 assert(m_mode == WriteOnly);
427 320
428 #ifdef DEBUG_MATRIX_FILE_READ_SET 321 #ifdef DEBUG_MATRIX_FILE_READ_SET
429 // std::cerr << "MatrixFile::setColumnAt(" << x << ")" << std::endl; 322 std::cerr << "MatrixFile[" << m_fd << "]::setColumnAt(" << x << ")" << std::endl;
430 std::cerr << "."; 323 // std::cerr << ".";
431 #endif 324 #endif
432 325
433 ssize_t w = 0; 326 ssize_t w = 0;
434 bool seekFailed = false; 327
435 328 if (!seekTo(x, 0)) {
436 { 329 std::cerr << "ERROR: MatrixFile::setColumnAt(" << x << "): Seek failed" << std::endl;
437 MutexLocker locker(&m_fdMutex, "MatrixFile::setColumnAt::m_fdMutex");
438
439 if (seekTo(x, 0)) {
440 w = ::write(m_fd, data, m_height * m_cellSize);
441 } else {
442 seekFailed = true;
443 }
444 }
445
446 if (!seekFailed && w != ssize_t(m_height * m_cellSize)) {
447 ::perror("WARNING: MatrixFile::setColumnAt: write failed");
448 throw FileOperationFailed(m_fileName, "write");
449 } else if (seekFailed) {
450 throw FileOperationFailed(m_fileName, "seek"); 330 throw FileOperationFailed(m_fileName, "seek");
451 } else { 331 }
452 MutexLocker locker 332
453 (&m_columnBitsetWriteMutex, 333 unsigned char set = 0;
454 "MatrixFile::setColumnAt::m_columnBitsetWriteMutex"); 334 w = ::write(m_fd, &set, 1);
455 m_columnBitset->set(x); 335 if (w != 1) {
456 } 336 ::perror("WARNING: MatrixFile::setColumnAt: write failed (1)");
457 } 337 throw FileOperationFailed(m_fileName, "write");
458 338 }
459 void 339
460 MatrixFile::suspend() 340 w = ::write(m_fd, data, m_height * m_cellSize);
461 { 341 if (w != ssize_t(m_height * m_cellSize)) {
462 MutexLocker locker(&m_fdMutex, "MatrixFile::suspend::m_fdMutex"); 342 ::perror("WARNING: MatrixFile::setColumnAt: write failed (2)");
463 MutexLocker locker2(&m_cacheMutex, "MatrixFile::suspend::m_cacheMutex"); 343 throw FileOperationFailed(m_fileName, "write");
464 344 }
465 if (m_fd < 0) return; // already suspended 345 /*
466 346 if (x == 0) {
467 #ifdef DEBUG_MATRIX_FILE 347 std::cerr << "Wrote " << m_height * m_cellSize << " bytes, as follows:" << std::endl;
468 std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::suspend(): fd was " << m_fd << std::endl; 348 for (int i = 0; i < m_height * m_cellSize; ++i) {
469 #endif 349 std::cerr << (int)(((char *)data)[i]) << " ";
470 350 }
471 if (m_requestToken >= 0) {
472 void *data = 0;
473 FileReadThread::Request request;
474 if (m_readThread->getRequest(m_requestToken, request)) {
475 data = request.data;
476 }
477 m_readThread->cancel(m_requestToken);
478 if (data) free(data);
479 m_requestToken = -1;
480 }
481
482 if (m_cache.data) {
483 free(m_cache.data);
484 m_cache.data = 0;
485 }
486
487 if (m_spareData) {
488 free(m_spareData);
489 m_spareData = 0;
490 }
491
492 if (::close(m_fd) < 0) {
493 ::perror("WARNING: MatrixFile::suspend: close failed");
494 throw FileOperationFailed(m_fileName, "close");
495 }
496
497 m_fd = -1;
498 }
499
500 void
501 MatrixFile::resume()
502 {
503 if (m_fd >= 0) return;
504
505 #ifdef DEBUG_MATRIX_FILE
506 std::cerr << "MatrixFile(" << this << ")::resume()" << std::endl;
507 #endif
508
509 if ((m_fd = ::open(m_fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
510 ::perror("Open failed");
511 std::cerr << "ERROR: MatrixFile::resume: "
512 << "Failed to open cache file \""
513 << m_fileName.toStdString() << "\"";
514 if (m_mode == ReadWrite) std::cerr << " for writing";
515 std::cerr << std::endl; 351 std::cerr << std::endl;
516 throw FailedToOpenFile(m_fileName); 352 }
517 } 353 */
518 354 if (!seekTo(x, 0)) {
519 #ifdef DEBUG_MATRIX_FILE 355 std::cerr << "MatrixFile[" << m_fd << "]::setColumnAt(" << x << "): Seek failed" << std::endl;
520 std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::resume(): fd is " << m_fd << std::endl; 356 throw FileOperationFailed(m_fileName, "seek");
521 #endif 357 }
522 } 358
523 359 set = 1;
524 void 360 w = ::write(m_fd, &set, 1);
525 MatrixFile::primeCache(size_t x, bool goingLeft) 361 if (w != 1) {
526 { 362 ::perror("WARNING: MatrixFile::setColumnAt: write failed (3)");
527 // Profiler profiler("MatrixFile::primeCache"); 363 throw FileOperationFailed(m_fileName, "write");
528 364 }
529 #ifdef DEBUG_MATRIX_FILE_READ_SET 365 }
530 std::cerr << "MatrixFile::primeCache(" << x << ", " << goingLeft << ")" << std::endl; 366
531 #endif 367 bool
532 368 MatrixFile::seekTo(size_t x, size_t y) const
533 size_t rx = x; 369 {
534 size_t rw = m_defaultCacheWidth;
535
536 size_t left = rw / 3;
537 if (goingLeft) left = (rw * 2) / 3;
538
539 if (rx > left) rx -= left;
540 else rx = 0;
541
542 if (rx + rw > m_width) rw = m_width - rx;
543
544 if (!m_eagerCache) {
545
546 size_t ti = 0;
547
548 for (ti = 0; ti < rw; ++ti) {
549 if (!m_columnBitset->get(rx + ti)) break;
550 }
551
552 #ifdef DEBUG_MATRIX_FILE
553 if (ti < rw) {
554 std::cerr << "eagerCache is false and there's a hole at "
555 << rx + ti << ", reducing rw from " << rw << " to "
556 << ti << std::endl;
557 }
558 #endif
559
560 rw = std::min(rw, ti);
561 if (rw < 10 || rx + rw <= x) return;
562 }
563
564 MutexLocker locker(&m_cacheMutex, "MatrixFile::primeCache::m_cacheMutex");
565
566 // Check for the existence of the request first; if it exists,
567 // check whether it's ready. Only when we know it's ready do we
568 // retrieve the actual request, because the reason we need the
569 // request is to check whether it was successful or not and
570 // extract data from it, and none of that can be relied upon if we
571 // retrieve the request before it's ready. (There used to be a
572 // race condition here, where we retrieved the request and only
573 // afterwards checked the ready status, pulling data from the
574 // request if it was found to be ready then.)
575
576 if (m_requestToken >= 0 &&
577 m_readThread->haveRequest(m_requestToken)) {
578
579 if (x >= m_requestingX &&
580 x < m_requestingX + m_requestingWidth) {
581
582 if (m_readThread->isReady(m_requestToken)) {
583
584 FileReadThread::Request request;
585 if (!m_readThread->getRequest(m_requestToken, request)) {
586 std::cerr << "ERROR: MatrixFile::primeCache: File read thread has lost our request!" << std::endl;
587 throw FileReadFailed(m_fileName);
588 }
589
590 if (!request.successful) {
591 std::cerr << "ERROR: MatrixFile::primeCache: Last request was unsuccessful" << std::endl;
592 throw FileReadFailed(m_fileName);
593 }
594
595 #ifdef DEBUG_MATRIX_FILE_READ_SET
596 std::cerr << "last request is ready! (" << m_requestingX << ", "<< m_requestingWidth << ")" << std::endl;
597 #endif
598
599 m_cache.x = (request.start - m_headerSize) / (m_height * m_cellSize);
600 m_cache.width = request.size / (m_height * m_cellSize);
601
602 #ifdef DEBUG_MATRIX_FILE_READ_SET
603 std::cerr << "received last request: actual size is: " << m_cache.x << ", " << m_cache.width << std::endl;
604 #endif
605
606 if (m_cache.data) {
607 if (m_spareData) {
608 // std::cerr << this << ": Freeing spare data" << std::endl;
609 free(m_spareData);
610 }
611 // std::cerr << this << ": Moving old cache data to spare" << std::endl;
612 m_spareData = m_cache.data;
613 }
614 // std::cerr << this << ": Moving request data to cache" << std::endl;
615 m_cache.data = request.data;
616
617 m_readThread->done(m_requestToken);
618 m_requestToken = -1;
619 }
620
621 // already requested something covering this area; wait for it
622 return;
623 }
624
625 FileReadThread::Request dud;
626
627 if (!m_readThread->getRequest(m_requestToken, dud)) {
628
629 std::cerr << "ERROR: MatrixFile::primeCache: Inconsistent replies from FileReadThread" << std::endl;
630
631 } else {
632
633 // current request is for the wrong area, so no longer of any use
634 m_readThread->cancel(m_requestToken);
635
636 // crude way to avoid leaking the data
637 while (!m_readThread->isCancelled(m_requestToken)) {
638 usleep(10000);
639 }
640
641 #ifdef DEBUG_MATRIX_FILE_READ_SET
642 std::cerr << "cancelled " << m_requestToken << std::endl;
643 #endif
644
645 if (m_spareData) {
646 free(m_spareData);
647 }
648 m_spareData = dud.data;
649 m_readThread->done(m_requestToken);
650 }
651
652 m_requestToken = -1;
653 }
654
655 if (m_fd < 0) { 370 if (m_fd < 0) {
656 MutexLocker locker(&m_fdMutex, "MatrixFile::primeCache::m_fdMutex"); 371 std::cerr << "ERROR: MatrixFile::seekTo: File not open" << std::endl;
657 if (m_fd < 0) resume(); 372 return false;
658 } 373 }
659 374
660 FileReadThread::Request request; 375 off_t off = m_headerSize + x * m_height * m_cellSize + x + y * m_cellSize;
661 376
662 request.fd = m_fd; 377 #ifdef DEBUG_MATRIX_FILE_READ_SET
663 request.mutex = &m_fdMutex; 378 std::cerr << "MatrixFile[" << m_fd << "]::seekTo(" << x << "," << y << "): off = " << off << std::endl;
664 request.start = m_headerSize + rx * m_height * m_cellSize; 379 #endif
665 request.size = rw * m_height * m_cellSize;
666
667 // std::cerr << this << ": Moving spare data to request, and resizing to " << rw * m_height * m_cellSize << std::endl;
668
669 request.data = (char *)realloc(m_spareData, rw * m_height * m_cellSize);
670 MUNLOCK(request.data, rw * m_height * m_cellSize);
671 m_spareData = 0;
672
673 m_requestingX = rx;
674 m_requestingWidth = rw;
675
676 int token = m_readThread->request(request);
677 #ifdef DEBUG_MATRIX_FILE_READ_SET
678 std::cerr << "MatrixFile::primeCache: request token is "
679 << token << " (x = [" << rx << "], w = [" << rw << "], left = [" << goingLeft << "])" << std::endl;
680 #endif
681 m_requestToken = token;
682 }
683
684 bool
685 MatrixFile::seekTo(size_t x, size_t y)
686 {
687 if (m_fd < 0) resume();
688
689 off_t off = m_headerSize + (x * m_height + y) * m_cellSize;
690 380
691 if (::lseek(m_fd, off, SEEK_SET) == (off_t)-1) { 381 if (::lseek(m_fd, off, SEEK_SET) == (off_t)-1) {
692 ::perror("Seek failed"); 382 ::perror("Seek failed");
693 std::cerr << "ERROR: MatrixFile::seekTo(" << x << ", " << y 383 std::cerr << "ERROR: MatrixFile::seekTo(" << x << ", " << y
694 << ") failed" << std::endl; 384 << ") = " << off << " failed" << std::endl;
695 return false; 385 return false;
696 } 386 }
697 387
698 return true; 388 return true;
699 } 389 }