FileReadThread.cpp
Go to the documentation of this file.
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
2 
3 /*
4  Sonic Visualiser
5  An audio file viewer and annotation editor.
6  Centre for Digital Music, Queen Mary, University of London.
7  This file copyright 2006 Chris Cannam.
8 
9  This program is free software; you can redistribute it and/or
10  modify it under the terms of the GNU General Public License as
11  published by the Free Software Foundation; either version 2 of the
12  License, or (at your option) any later version. See the file
13  COPYING included with this distribution for more information.
14 */
15 
16 #include "FileReadThread.h"
17 
18 #include "base/Profiler.h"
19 #include "base/Thread.h"
20 
21 #include <iostream>
22 
23 #ifdef _MSC_VER
24 #include <io.h>
25 #define _lseek lseek
26 #else
27 #include <unistd.h>
28 #endif
29 
30 #include <cstdio>
31 
32 //#define DEBUG_FILE_READ_THREAD 1
33 
35  m_nextToken(0),
36  m_exiting(false)
37 {
38 }
39 
40 void
42 {
43  MutexLocker locker(&m_mutex, "FileReadThread::run::m_mutex");
44 
45  while (!m_exiting) {
46  if (m_queue.empty()) {
47  m_condition.wait(&m_mutex, 1000);
48  } else {
49  process();
50  }
52  }
53 
55 
56 #ifdef DEBUG_FILE_READ_THREAD
57  SVDEBUG << "FileReadThread::run() exiting" << endl;
58 #endif
59 }
60 
61 void
63 {
64 #ifdef DEBUG_FILE_READ_THREAD
65  SVDEBUG << "FileReadThread::finish()" << endl;
66 #endif
67 
68  {
69  MutexLocker locker(&m_mutex, "FileReadThread::finish::m_mutex");
70 
71  while (!m_queue.empty()) {
72  m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
73  m_newlyCancelled.insert(m_queue.begin()->first);
74  m_queue.erase(m_queue.begin());
75  }
76 
77  m_exiting = true;
78  }
79 
80  m_condition.wakeAll();
81 
82 #ifdef DEBUG_FILE_READ_THREAD
83  SVDEBUG << "FileReadThread::finish() exiting" << endl;
84 #endif
85 }
86 
87 int
89 {
90  int token;
91 
92  {
93  MutexLocker locker(&m_mutex, "FileReadThread::request::m_mutex");
94 
95  token = m_nextToken++;
96  m_queue[token] = request;
97  }
98 
99  m_condition.wakeAll();
100 
101  return token;
102 }
103 
104 void
106 {
107  {
108  MutexLocker locker(&m_mutex, "FileReadThread::cancel::m_mutex");
109 
110  if (m_queue.find(token) != m_queue.end()) {
111  m_cancelledRequests[token] = m_queue[token];
112  m_queue.erase(token);
113  m_newlyCancelled.insert(token);
114  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
115  m_cancelledRequests[token] = m_readyRequests[token];
116  m_readyRequests.erase(token);
117  } else {
118  cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << endl;
119  }
120  }
121 
122 #ifdef DEBUG_FILE_READ_THREAD
123  SVDEBUG << "FileReadThread::cancel(" << token << ") waking condition" << endl;
124 #endif
125 
126  m_condition.wakeAll();
127 }
128 
129 bool
131 {
132  MutexLocker locker(&m_mutex, "FileReadThread::isReady::m_mutex");
133 
134  bool ready = m_readyRequests.find(token) != m_readyRequests.end();
135 
136  return ready;
137 }
138 
139 bool
141 {
142  MutexLocker locker(&m_mutex, "FileReadThread::isCancelled::m_mutex");
143 
144  bool cancelled =
145  m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
146  m_newlyCancelled.find(token) == m_newlyCancelled.end();
147 
148  return cancelled;
149 }
150 
151 bool
153 {
154  MutexLocker locker(&m_mutex, "FileReadThread::haveRequest::m_mutex");
155 
156  bool found = false;
157 
158  if (m_queue.find(token) != m_queue.end()) {
159  found = true;
160  } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
161  found = true;
162  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
163  found = true;
164  }
165 
166  return found;
167 }
168 
169 bool
171 {
172  MutexLocker locker(&m_mutex, "FileReadThread::getRequest::m_mutex");
173 
174  bool found = false;
175 
176  if (m_queue.find(token) != m_queue.end()) {
177  request = m_queue[token];
178  found = true;
179  } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
180  request = m_cancelledRequests[token];
181  found = true;
182  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
183  request = m_readyRequests[token];
184  found = true;
185  }
186 
187  return found;
188 }
189 
190 void
192 {
193  MutexLocker locker(&m_mutex, "FileReadThread::done::m_mutex");
194 
195  bool found = false;
196 
197  if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
198  m_cancelledRequests.erase(token);
199  m_newlyCancelled.erase(token);
200  found = true;
201  } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
202  m_readyRequests.erase(token);
203  found = true;
204  } else if (m_queue.find(token) != m_queue.end()) {
205  cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << endl;
206  }
207 
208  if (!found) {
209  cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << endl;
210  }
211 }
212 
213 void
215 {
216  // entered with m_mutex locked and m_queue non-empty
217 
218  Profiler profiler("FileReadThread::process", true);
219 
220  int token = m_queue.begin()->first;
221  Request request = m_queue.begin()->second;
222 
223  m_mutex.unlock();
224 
225 #ifdef DEBUG_FILE_READ_THREAD
226  SVDEBUG << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << endl;
227 #endif
228 
229  bool successful = false;
230  bool seekFailed = false;
231  ssize_t r = 0;
232 
233  {
234  MutexLocker rlocker(request.mutex, "FileReadThread::process::request.mutex");
235 
236  if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
237  seekFailed = true;
238  } else {
239 
240  // if request.size is large, we want to avoid making a single
241  // system call to read it all as it may block too much
242 
243  static const size_t blockSize = 256 * 1024;
244 
245  size_t size = request.size;
246  char *destination = request.data;
247 
248  while (size > 0) {
249  size_t readSize = size;
250  if (readSize > blockSize) readSize = blockSize;
251  ssize_t br = ::read(request.fd, destination, readSize);
252  if (br < 0) {
253  r = br;
254  break;
255  } else {
256  r += br;
257  if (br < ssize_t(readSize)) break;
258  }
259  destination += readSize;
260  size -= readSize;
261  }
262  }
263  }
264 
265  if (seekFailed) {
266  ::perror("Seek failed");
267  cerr << "ERROR: FileReadThread::process: seek to "
268  << request.start << " failed" << endl;
269  request.size = 0;
270  } else {
271  if (r < 0) {
272  ::perror("ERROR: FileReadThread::process: Read failed");
273  cerr << "ERROR: FileReadThread::process: read of "
274  << request.size << " at "
275  << request.start << " failed" << endl;
276  request.size = 0;
277  } else if (r < ssize_t(request.size)) {
278  cerr << "WARNING: FileReadThread::process: read "
279  << request.size << " returned only " << r << " bytes"
280  << endl;
281  request.size = r;
282  usleep(100000);
283  } else {
284  successful = true;
285  }
286  }
287 
288  // Check that the token hasn't been cancelled and the thread
289  // hasn't been asked to finish
290 
291  m_mutex.lock();
292 
293  request.successful = successful;
294 
295  if (m_queue.find(token) != m_queue.end() && !m_exiting) {
296  m_queue.erase(token);
297  m_readyRequests[token] = request;
298 #ifdef DEBUG_FILE_READ_THREAD
299  SVDEBUG << "FileReadThread::process: done, marking as ready (success = " << m_readyRequests[token].successful << ")" << endl;
300 #endif
301  } else {
302 #ifdef DEBUG_FILE_READ_THREAD
303  if (m_exiting) {
304  SVDEBUG << "FileReadThread::process: exiting" << endl;
305  } else {
306  SVDEBUG << "FileReadThread::process: request disappeared" << endl;
307  }
308 #endif
309  }
310 }
311 
312 void
314 {
315  // entered with m_mutex locked
316 
317  while (!m_newlyCancelled.empty()) {
318 
319  int token = *m_newlyCancelled.begin();
320 
321 #ifdef DEBUG_FILE_READ_THREAD
322  SVDEBUG << "FileReadThread::notifyCancelled: token " << token << endl;
323 #endif
324 
325  m_newlyCancelled.erase(token);
326  }
327 }
328 
329 
QWaitCondition m_condition
virtual bool haveRequest(int token)
virtual void cancel(int token)
RequestQueue m_cancelledRequests
RequestQueue m_readyRequests
std::set< int > m_newlyCancelled
virtual void finish()
RequestQueue m_queue
virtual int request(const Request &request)
#define SVDEBUG
Definition: Debug.h:106
virtual bool isCancelled(int token)
virtual void done(int token)
virtual bool getRequest(int token, Request &request)
virtual bool isReady(int token)
void run() override
Profile point instance class.
Definition: Profiler.h:93