cannam@150
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
cannam@150
|
2 /*
|
cannam@150
|
3 Piper C++
|
cannam@150
|
4
|
cannam@150
|
5 An API for audio analysis and feature extraction plugins.
|
cannam@150
|
6
|
cannam@150
|
7 Centre for Digital Music, Queen Mary, University of London.
|
cannam@150
|
8 Copyright 2006-2016 Chris Cannam and QMUL.
|
cannam@150
|
9
|
cannam@150
|
10 Permission is hereby granted, free of charge, to any person
|
cannam@150
|
11 obtaining a copy of this software and associated documentation
|
cannam@150
|
12 files (the "Software"), to deal in the Software without
|
cannam@150
|
13 restriction, including without limitation the rights to use, copy,
|
cannam@150
|
14 modify, merge, publish, distribute, sublicense, and/or sell copies
|
cannam@150
|
15 of the Software, and to permit persons to whom the Software is
|
cannam@150
|
16 furnished to do so, subject to the following conditions:
|
cannam@150
|
17
|
cannam@150
|
18 The above copyright notice and this permission notice shall be
|
cannam@150
|
19 included in all copies or substantial portions of the Software.
|
cannam@150
|
20
|
cannam@150
|
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
cannam@150
|
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
cannam@150
|
23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
cannam@150
|
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
|
cannam@150
|
25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
|
cannam@150
|
26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
cannam@150
|
27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
cannam@150
|
28
|
cannam@150
|
29 Except as contained in this notice, the names of the Centre for
|
cannam@150
|
30 Digital Music; Queen Mary, University of London; and Chris Cannam
|
cannam@150
|
31 shall not be used in advertising or otherwise to promote the sale,
|
cannam@150
|
32 use or other dealings in this Software without prior written
|
cannam@150
|
33 authorization.
|
cannam@150
|
34 */
|
cannam@150
|
35
|
cannam@150
|
36 #ifndef PIPER_PROCESS_QT_TRANSPORT_H
|
cannam@150
|
37 #define PIPER_PROCESS_QT_TRANSPORT_H
|
cannam@150
|
38
|
cannam@152
|
39 #include "../SynchronousTransport.h"
|
cannam@170
|
40 #include "../Exceptions.h"
|
cannam@150
|
41
|
cannam@150
|
42 #include <QProcess>
|
cannam@150
|
43 #include <QString>
|
cannam@150
|
44 #include <QMutex>
|
cannam@286
|
45 #include <QElapsedTimer>
|
cannam@150
|
46
|
cannam@150
|
47 #include <iostream>
|
cannam@150
|
48
|
cannam@150
|
49 //#define DEBUG_TRANSPORT 1
|
cannam@150
|
50
|
cannam@150
|
51 namespace piper_vamp {
|
cannam@150
|
52 namespace client {
|
cannam@150
|
53
|
cannam@150
|
54 /**
|
cannam@150
|
55 * A SynchronousTransport implementation that spawns a sub-process
|
cannam@150
|
56 * using Qt's QProcess abstraction and talks to it via stdin/stdout
|
cannam@150
|
57 * channels. Calls are completely serialized; the protocol only
|
cannam@150
|
58 * supports one call in process at a time, and therefore the transport
|
cannam@150
|
59 * only allows one at a time.
|
cannam@150
|
60 *
|
cannam@150
|
61 * This class is thread-safe, but in practice you can only use it from
|
cannam@150
|
62 * within a single thread, because the underlying QProcess does not
|
cannam@150
|
63 * support switching threads.
|
cannam@150
|
64 */
|
cannam@150
|
65 class ProcessQtTransport : public SynchronousTransport
|
cannam@150
|
66 {
|
cannam@150
|
67 public:
|
cannam@150
|
68 ProcessQtTransport(std::string processName,
|
cannam@150
|
69 std::string formatArg,
|
cannam@150
|
70 LogCallback *logger) : // logger may be nullptr for cerr
|
cannam@150
|
71 m_logger(logger),
|
cannam@150
|
72 m_completenessChecker(0),
|
cannam@150
|
73 m_crashed(false) {
|
cannam@150
|
74
|
cannam@150
|
75 m_process = new QProcess();
|
cannam@150
|
76 m_process->setReadChannel(QProcess::StandardOutput);
|
cannam@282
|
77
|
cannam@282
|
78 if (m_logger) {
|
cannam@282
|
79 m_process->setProcessChannelMode(QProcess::SeparateChannels);
|
cannam@282
|
80 } else {
|
cannam@282
|
81 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
|
cannam@282
|
82 }
|
cannam@150
|
83
|
cannam@150
|
84 m_process->start(QString::fromStdString(processName),
|
cannam@150
|
85 { QString::fromStdString(formatArg) });
|
cannam@150
|
86
|
cannam@150
|
87 if (!m_process->waitForStarted()) {
|
cannam@150
|
88 if (m_process->state() == QProcess::NotRunning) {
|
cannam@150
|
89 QProcess::ProcessError err = m_process->error();
|
cannam@150
|
90 if (err == QProcess::FailedToStart) {
|
cannam@150
|
91 log("Unable to start server process " + processName);
|
cannam@150
|
92 } else if (err == QProcess::Crashed) {
|
cannam@150
|
93 log("Server process " + processName + " crashed on startup");
|
cannam@150
|
94 } else {
|
cannam@150
|
95 QString e = QString("%1").arg(err);
|
cannam@150
|
96 log("Server process " + processName +
|
cannam@150
|
97 " failed on startup with error code " + e.toStdString());
|
cannam@150
|
98 }
|
cannam@282
|
99 logServerErrors();
|
cannam@150
|
100 delete m_process;
|
cannam@150
|
101 m_process = nullptr;
|
cannam@150
|
102 }
|
cannam@150
|
103 }
|
cannam@150
|
104
|
cannam@150
|
105 if (m_process) {
|
cannam@150
|
106 log("Server process " + processName + " started OK");
|
cannam@282
|
107 logServerErrors();
|
cannam@150
|
108 }
|
cannam@150
|
109 }
|
cannam@150
|
110
|
cannam@150
|
111 ~ProcessQtTransport() {
|
cannam@150
|
112 if (m_process) {
|
cannam@150
|
113 if (m_process->state() != QProcess::NotRunning) {
|
cannam@150
|
114 m_process->closeWriteChannel();
|
cannam@150
|
115 m_process->waitForFinished(200);
|
cannam@150
|
116 m_process->close();
|
cannam@150
|
117 m_process->waitForFinished();
|
cannam@150
|
118 log("Server process exited normally");
|
cannam@150
|
119 }
|
cannam@282
|
120 logServerErrors();
|
cannam@150
|
121 delete m_process;
|
cannam@150
|
122 }
|
cannam@150
|
123 }
|
cannam@150
|
124
|
cannam@150
|
125 void
|
cannam@150
|
126 setCompletenessChecker(MessageCompletenessChecker *checker) override {
|
cannam@150
|
127 m_completenessChecker = checker;
|
cannam@150
|
128 }
|
cannam@150
|
129
|
cannam@150
|
130 bool
|
cannam@150
|
131 isOK() const override {
|
cannam@150
|
132 return (m_process != nullptr) && !m_crashed;
|
cannam@150
|
133 }
|
cannam@150
|
134
|
cannam@150
|
135 std::vector<char>
|
cannam@150
|
136 call(const char *ptr, size_t size, std::string type, bool slow) override {
|
cannam@150
|
137
|
cannam@150
|
138 QMutexLocker locker(&m_mutex);
|
cannam@150
|
139
|
cannam@150
|
140 if (!m_completenessChecker) {
|
cannam@150
|
141 log("call: No completeness checker set on transport");
|
cannam@150
|
142 throw std::logic_error("No completeness checker set on transport");
|
cannam@150
|
143 }
|
cannam@150
|
144 if (!isOK()) {
|
cannam@150
|
145 log("call: Transport is not OK");
|
cannam@150
|
146 throw std::logic_error("Transport is not OK");
|
cannam@150
|
147 }
|
cannam@150
|
148
|
cannam@150
|
149 #ifdef DEBUG_TRANSPORT
|
cannam@150
|
150 std::cerr << "writing " << size << " bytes to server" << std::endl;
|
cannam@150
|
151 #endif
|
cannam@150
|
152 m_process->write(ptr, size);
|
cannam@150
|
153 m_process->waitForBytesWritten();
|
cannam@150
|
154
|
cannam@150
|
155 std::vector<char> buffer;
|
cannam@150
|
156 bool complete = false;
|
cannam@150
|
157
|
cannam@286
|
158 QElapsedTimer t;
|
cannam@150
|
159 t.start();
|
cannam@150
|
160
|
cannam@150
|
161 // We don't like to timeout at all while waiting for a
|
cannam@150
|
162 // response -- we'd like to wait as long as the server
|
cannam@150
|
163 // continues running.
|
cannam@150
|
164 //
|
cannam@286
|
165 qint64 beforeResponseTimeout = 0; // ms, 0 = no timeout
|
cannam@150
|
166
|
cannam@150
|
167 // But if the call is marked as fast (i.e. just retrieving
|
cannam@150
|
168 // info rather than calculating something) we will time out
|
cannam@150
|
169 // after a bit.
|
cannam@150
|
170 //
|
cannam@150
|
171 if (!slow) beforeResponseTimeout = 10000; // ms, 0 = no timeout
|
cannam@150
|
172
|
cannam@150
|
173 // But we do timeout if the server sends part of a reply and
|
cannam@150
|
174 // then gets stuck. It's reasonable to assume that a server
|
cannam@150
|
175 // that's already prepared its message and started sending has
|
cannam@150
|
176 // finished doing any real work. In each case the timeout is
|
cannam@150
|
177 // measured since data was last read.
|
cannam@150
|
178 //
|
cannam@286
|
179 qint64 duringResponseTimeout = 5000; // ms, 0 = no timeout
|
cannam@150
|
180
|
cannam@150
|
181 while (!complete) {
|
cannam@150
|
182
|
cannam@150
|
183 bool responseStarted = !buffer.empty(); // already have something
|
cannam@286
|
184 qint64 ms = t.elapsed(); // time since start or since last read
|
cannam@150
|
185
|
cannam@150
|
186 qint64 byteCount = m_process->bytesAvailable();
|
cannam@150
|
187
|
cannam@150
|
188 if (!byteCount) {
|
cannam@150
|
189
|
cannam@150
|
190 if (responseStarted) {
|
cannam@150
|
191 if (duringResponseTimeout > 0 && ms > duringResponseTimeout) {
|
cannam@150
|
192 log("Server timed out during response");
|
cannam@282
|
193 logServerErrors();
|
cannam@170
|
194 m_crashed = true;
|
cannam@170
|
195 throw RequestTimedOut();
|
cannam@150
|
196 }
|
cannam@150
|
197 } else {
|
cannam@150
|
198 if (beforeResponseTimeout > 0 && ms > beforeResponseTimeout) {
|
cannam@150
|
199 log("Server timed out before response");
|
cannam@282
|
200 logServerErrors();
|
cannam@170
|
201 m_crashed = true;
|
cannam@170
|
202 throw RequestTimedOut();
|
cannam@150
|
203 }
|
cannam@150
|
204 }
|
cannam@150
|
205
|
cannam@150
|
206 #ifdef DEBUG_TRANSPORT
|
cannam@150
|
207 std::cerr << "waiting for data from server (slow = " << slow << ")..." << std::endl;
|
cannam@150
|
208 #endif
|
cannam@150
|
209 if (slow) {
|
cannam@150
|
210 m_process->waitForReadyRead(1000);
|
cannam@150
|
211 } else {
|
cannam@150
|
212 #ifdef _WIN32
|
cannam@150
|
213 // This is most unsatisfactory -- if we give a non-zero
|
cannam@150
|
214 // arg here, then we end up sleeping way beyond the arrival
|
cannam@150
|
215 // of the data to read -- can end up using less than 10%
|
cannam@150
|
216 // CPU during processing which is crazy. So for Windows
|
cannam@150
|
217 // only, we busy-wait during "fast" calls. It works out
|
cannam@150
|
218 // much faster in the end. Could do with a simpler native
|
cannam@150
|
219 // blocking API really.
|
cannam@150
|
220 m_process->waitForReadyRead(0);
|
cannam@150
|
221 #else
|
cannam@150
|
222 m_process->waitForReadyRead(100);
|
cannam@150
|
223 #endif
|
cannam@150
|
224 }
|
cannam@150
|
225 if (m_process->state() == QProcess::NotRunning &&
|
cannam@150
|
226 // don't give up until we've read all that's been buffered!
|
cannam@150
|
227 !m_process->bytesAvailable()) {
|
cannam@150
|
228 QProcess::ProcessError err = m_process->error();
|
cannam@150
|
229 if (err == QProcess::Crashed) {
|
cannam@150
|
230 log("Server crashed during " + type + " request");
|
cannam@150
|
231 } else {
|
cannam@150
|
232 QString e = QString("%1").arg(err);
|
cannam@150
|
233 log("Server failed during " + type
|
cannam@150
|
234 + " request with error code " + e.toStdString());
|
cannam@150
|
235 }
|
cannam@150
|
236 m_crashed = true;
|
cannam@150
|
237 throw ServerCrashed();
|
cannam@150
|
238 }
|
cannam@150
|
239 } else {
|
cannam@150
|
240 size_t formerSize = buffer.size();
|
cannam@150
|
241 buffer.resize(formerSize + byteCount);
|
cannam@150
|
242 m_process->read(buffer.data() + formerSize, byteCount);
|
cannam@150
|
243 switch (m_completenessChecker->check(buffer)) {
|
cannam@150
|
244 case MessageCompletenessChecker::Complete: complete = true; break;
|
cannam@150
|
245 case MessageCompletenessChecker::Incomplete: break;
|
cannam@170
|
246 case MessageCompletenessChecker::Invalid: throw ProtocolError();
|
cannam@150
|
247 }
|
cannam@150
|
248 (void)t.restart(); // reset timeout when we read anything
|
cannam@150
|
249 }
|
cannam@150
|
250 }
|
cannam@150
|
251
|
cannam@282
|
252 logServerErrors();
|
cannam@150
|
253 return buffer;
|
cannam@150
|
254 }
|
cannam@150
|
255
|
cannam@150
|
256 private:
|
cannam@150
|
257 LogCallback *m_logger;
|
cannam@150
|
258 MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently)
|
cannam@150
|
259 QProcess *m_process; // I own this
|
cannam@150
|
260 QMutex m_mutex;
|
cannam@150
|
261 bool m_crashed;
|
cannam@150
|
262
|
cannam@150
|
263 void log(std::string message) const {
|
cannam@150
|
264 if (m_logger) m_logger->log(message);
|
cannam@150
|
265 else std::cerr << message << std::endl;
|
cannam@150
|
266 }
|
cannam@282
|
267
|
cannam@282
|
268 void logServerErrors() const {
|
cannam@282
|
269 if (!m_logger || !m_process) return;
|
cannam@282
|
270
|
cannam@282
|
271 m_process->setReadChannel(QProcess::StandardError);
|
cannam@282
|
272
|
cannam@282
|
273 qint64 byteCount = m_process->bytesAvailable();
|
cannam@282
|
274 if (byteCount == 0) {
|
cannam@282
|
275 m_process->setReadChannel(QProcess::StandardOutput);
|
cannam@282
|
276 return;
|
cannam@282
|
277 }
|
cannam@282
|
278
|
cannam@282
|
279 QByteArray buffer = m_process->read(byteCount);
|
cannam@285
|
280 while (buffer.endsWith('\n') || buffer.endsWith('\r')) {
|
cannam@285
|
281 buffer.chop(1);
|
cannam@285
|
282 }
|
cannam@284
|
283 std::string str(buffer.constData(), buffer.size());
|
cannam@282
|
284 m_logger->log("Piper server stderr output follows:\n" + str);
|
cannam@282
|
285 m_logger->log("Piper server stderr output ends");
|
cannam@282
|
286
|
cannam@282
|
287 m_process->setReadChannel(QProcess::StandardOutput);
|
cannam@282
|
288 }
|
cannam@150
|
289 };
|
cannam@150
|
290
|
cannam@150
|
291 }
|
cannam@150
|
292 }
|
cannam@150
|
293
|
cannam@150
|
294 #endif
|