cannam@111
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
c@118
|
2 /*
|
c@118
|
3 Piper C++
|
c@118
|
4
|
c@118
|
5 An API for audio analysis and feature extraction plugins.
|
c@118
|
6
|
c@118
|
7 Centre for Digital Music, Queen Mary, University of London.
|
c@118
|
8 Copyright 2006-2016 Chris Cannam and QMUL.
|
c@118
|
9
|
c@118
|
10 Permission is hereby granted, free of charge, to any person
|
c@118
|
11 obtaining a copy of this software and associated documentation
|
c@118
|
12 files (the "Software"), to deal in the Software without
|
c@118
|
13 restriction, including without limitation the rights to use, copy,
|
c@118
|
14 modify, merge, publish, distribute, sublicense, and/or sell copies
|
c@118
|
15 of the Software, and to permit persons to whom the Software is
|
c@118
|
16 furnished to do so, subject to the following conditions:
|
c@118
|
17
|
c@118
|
18 The above copyright notice and this permission notice shall be
|
c@118
|
19 included in all copies or substantial portions of the Software.
|
c@118
|
20
|
c@118
|
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
c@118
|
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
c@118
|
23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
c@118
|
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
|
c@118
|
25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
|
c@118
|
26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
c@118
|
27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
c@118
|
28
|
c@118
|
29 Except as contained in this notice, the names of the Centre for
|
c@118
|
30 Digital Music; Queen Mary, University of London; and Chris Cannam
|
c@118
|
31 shall not be used in advertising or otherwise to promote the sale,
|
c@118
|
32 use or other dealings in this Software without prior written
|
c@118
|
33 authorization.
|
c@118
|
34 */
|
cannam@111
|
35
|
c@94
|
36 #ifndef PIPER_PROCESS_QT_TRANSPORT_H
|
c@94
|
37 #define PIPER_PROCESS_QT_TRANSPORT_H
|
c@94
|
38
|
c@94
|
39 #include "SynchronousTransport.h"
|
c@94
|
40
|
c@94
|
41 #include <QProcess>
|
c@94
|
42 #include <QString>
|
c@100
|
43 #include <QMutex>
|
c@148
|
44 #include <QTime>
|
c@94
|
45
|
c@94
|
46 #include <iostream>
|
c@94
|
47
|
c@124
|
48 //#define DEBUG_TRANSPORT 1
|
c@124
|
49
|
c@97
|
50 namespace piper_vamp {
|
c@97
|
51 namespace client {
|
c@94
|
52
|
c@100
|
53 /**
|
c@100
|
54 * A SynchronousTransport implementation that spawns a sub-process
|
c@100
|
55 * using Qt's QProcess abstraction and talks to it via stdin/stdout
|
c@100
|
56 * channels. Calls are completely serialized; the protocol only
|
c@100
|
57 * supports one call in process at a time, and therefore the transport
|
c@125
|
58 * only allows one at a time.
|
c@125
|
59 *
|
c@125
|
60 * This class is thread-safe, but in practice you can only use it from
|
c@125
|
61 * within a single thread, because the underlying QProcess does not
|
c@125
|
62 * support switching threads.
|
c@100
|
63 */
|
c@94
|
64 class ProcessQtTransport : public SynchronousTransport
|
c@94
|
65 {
|
c@94
|
66 public:
|
c@134
|
67 ProcessQtTransport(std::string processName,
|
c@134
|
68 std::string formatArg,
|
c@134
|
69 LogCallback *logger) : // logger may be nullptr for cerr
|
c@134
|
70 m_logger(logger),
|
c@126
|
71 m_completenessChecker(0),
|
c@126
|
72 m_crashed(false) {
|
c@119
|
73
|
c@94
|
74 m_process = new QProcess();
|
c@94
|
75 m_process->setReadChannel(QProcess::StandardOutput);
|
c@94
|
76 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
|
cannam@114
|
77
|
c@119
|
78 m_process->start(QString::fromStdString(processName),
|
c@124
|
79 { QString::fromStdString(formatArg) });
|
cannam@114
|
80
|
c@94
|
81 if (!m_process->waitForStarted()) {
|
cannam@113
|
82 if (m_process->state() == QProcess::NotRunning) {
|
cannam@113
|
83 QProcess::ProcessError err = m_process->error();
|
cannam@113
|
84 if (err == QProcess::FailedToStart) {
|
c@134
|
85 log("Unable to start server process " + processName);
|
cannam@113
|
86 } else if (err == QProcess::Crashed) {
|
c@134
|
87 log("Server process " + processName + " crashed on startup");
|
cannam@113
|
88 } else {
|
c@134
|
89 QString e = QString("%1").arg(err);
|
c@134
|
90 log("Server process " + processName +
|
c@134
|
91 " failed on startup with error code " + e.toStdString());
|
cannam@113
|
92 }
|
cannam@113
|
93 delete m_process;
|
cannam@113
|
94 m_process = nullptr;
|
cannam@111
|
95 }
|
c@94
|
96 }
|
c@134
|
97
|
c@134
|
98 if (m_process) {
|
c@134
|
99 log("Server process " + processName + " started OK");
|
c@134
|
100 }
|
c@94
|
101 }
|
c@94
|
102
|
c@94
|
103 ~ProcessQtTransport() {
|
c@94
|
104 if (m_process) {
|
c@94
|
105 if (m_process->state() != QProcess::NotRunning) {
|
c@118
|
106 m_process->closeWriteChannel();
|
c@94
|
107 m_process->waitForFinished(200);
|
c@94
|
108 m_process->close();
|
c@94
|
109 m_process->waitForFinished();
|
c@134
|
110 log("Server process exited normally");
|
c@94
|
111 }
|
c@94
|
112 delete m_process;
|
c@94
|
113 }
|
c@94
|
114 }
|
c@94
|
115
|
c@94
|
116 void
|
cannam@111
|
117 setCompletenessChecker(MessageCompletenessChecker *checker) override {
|
c@94
|
118 m_completenessChecker = checker;
|
c@94
|
119 }
|
c@94
|
120
|
c@94
|
121 bool
|
c@94
|
122 isOK() const override {
|
c@126
|
123 return (m_process != nullptr) && !m_crashed;
|
c@94
|
124 }
|
c@94
|
125
|
c@94
|
126 std::vector<char>
|
c@134
|
127 call(const char *ptr, size_t size, std::string type, bool slow) override {
|
c@94
|
128
|
c@118
|
129 QMutexLocker locker(&m_mutex);
|
c@118
|
130
|
c@94
|
131 if (!m_completenessChecker) {
|
c@134
|
132 log("call: No completeness checker set on transport");
|
c@94
|
133 throw std::logic_error("No completeness checker set on transport");
|
c@94
|
134 }
|
c@126
|
135 if (!isOK()) {
|
c@134
|
136 log("call: Transport is not OK");
|
c@126
|
137 throw std::logic_error("Transport is not OK");
|
c@126
|
138 }
|
c@94
|
139
|
c@124
|
140 #ifdef DEBUG_TRANSPORT
|
c@115
|
141 std::cerr << "writing " << size << " bytes to server" << std::endl;
|
c@124
|
142 #endif
|
c@94
|
143 m_process->write(ptr, size);
|
c@126
|
144 m_process->waitForBytesWritten();
|
c@94
|
145
|
c@94
|
146 std::vector<char> buffer;
|
c@94
|
147 bool complete = false;
|
c@148
|
148
|
c@148
|
149 QTime t;
|
c@148
|
150 t.start();
|
c@148
|
151
|
c@148
|
152 // We don't like to timeout at all while waiting for a
|
c@148
|
153 // response -- we'd like to wait as long as the server
|
c@148
|
154 // continues running.
|
c@148
|
155 //
|
c@148
|
156 int beforeResponseTimeout = 0; // ms, 0 = no timeout
|
c@148
|
157
|
c@148
|
158 // But if the call is marked as fast (i.e. just retrieving
|
c@148
|
159 // info rather than calculating something) we will time out
|
c@148
|
160 // after a bit.
|
c@148
|
161 //
|
c@148
|
162 if (!slow) beforeResponseTimeout = 10000; // ms, 0 = no timeout
|
c@148
|
163
|
c@148
|
164 // But we do timeout if the server sends part of a reply and
|
c@148
|
165 // then gets stuck. It's reasonable to assume that a server
|
c@148
|
166 // that's already prepared its message and started sending has
|
c@148
|
167 // finished doing any real work. In each case the timeout is
|
c@148
|
168 // measured since data was last read.
|
c@148
|
169 //
|
c@148
|
170 int duringResponseTimeout = 5000; // ms, 0 = no timeout
|
c@94
|
171
|
c@94
|
172 while (!complete) {
|
c@94
|
173
|
c@148
|
174 bool responseStarted = !buffer.empty(); // already have something
|
c@148
|
175 int ms = t.elapsed(); // time since start or since last read
|
c@148
|
176
|
c@94
|
177 qint64 byteCount = m_process->bytesAvailable();
|
c@94
|
178
|
c@118
|
179 if (!byteCount) {
|
c@148
|
180
|
c@148
|
181 if (responseStarted) {
|
c@148
|
182 if (duringResponseTimeout > 0 && ms > duringResponseTimeout) {
|
c@148
|
183 log("Server timed out during response");
|
c@148
|
184 throw std::runtime_error("Request timed out");
|
c@148
|
185 }
|
c@148
|
186 } else {
|
c@148
|
187 if (beforeResponseTimeout > 0 && ms > beforeResponseTimeout) {
|
c@148
|
188 log("Server timed out before response");
|
c@148
|
189 throw std::runtime_error("Request timed out");
|
c@148
|
190 }
|
c@148
|
191 }
|
c@148
|
192
|
c@124
|
193 #ifdef DEBUG_TRANSPORT
|
c@145
|
194 std::cerr << "waiting for data from server (slow = " << slow << ")..." << std::endl;
|
c@124
|
195 #endif
|
c@126
|
196 if (slow) {
|
c@126
|
197 m_process->waitForReadyRead(1000);
|
c@126
|
198 } else {
|
c@126
|
199 #ifdef _WIN32
|
c@126
|
200 // This is most unsatisfactory -- if we give a non-zero
|
c@126
|
201 // arg here, then we end up sleeping way beyond the arrival
|
c@126
|
202 // of the data to read -- can end up using less than 10%
|
c@126
|
203 // CPU during processing which is crazy. So for Windows
|
c@126
|
204 // only, we busy-wait during "fast" calls. It works out
|
c@126
|
205 // much faster in the end. Could do with a simpler native
|
c@126
|
206 // blocking API really.
|
c@126
|
207 m_process->waitForReadyRead(0);
|
c@126
|
208 #else
|
c@126
|
209 m_process->waitForReadyRead(100);
|
c@126
|
210 #endif
|
c@126
|
211 }
|
c@126
|
212 if (m_process->state() == QProcess::NotRunning &&
|
c@126
|
213 // don't give up until we've read all that's been buffered!
|
c@126
|
214 !m_process->bytesAvailable()) {
|
c@115
|
215 QProcess::ProcessError err = m_process->error();
|
c@115
|
216 if (err == QProcess::Crashed) {
|
c@134
|
217 log("Server crashed during " + type + " request");
|
c@115
|
218 } else {
|
c@134
|
219 QString e = QString("%1").arg(err);
|
c@134
|
220 log("Server failed during " + type
|
c@134
|
221 + " request with error code " + e.toStdString());
|
c@115
|
222 }
|
c@126
|
223 m_crashed = true;
|
c@121
|
224 throw ServerCrashed();
|
c@94
|
225 }
|
c@94
|
226 } else {
|
c@94
|
227 size_t formerSize = buffer.size();
|
c@94
|
228 buffer.resize(formerSize + byteCount);
|
c@94
|
229 m_process->read(buffer.data() + formerSize, byteCount);
|
c@146
|
230 switch (m_completenessChecker->check(buffer)) {
|
c@146
|
231 case MessageCompletenessChecker::Complete: complete = true; break;
|
c@146
|
232 case MessageCompletenessChecker::Incomplete: break;
|
c@146
|
233 case MessageCompletenessChecker::Invalid:
|
c@146
|
234 throw std::runtime_error
|
c@146
|
235 ("Invalid message received: corrupt stream from server?");
|
c@146
|
236 }
|
c@148
|
237 (void)t.restart(); // reset timeout when we read anything
|
c@94
|
238 }
|
c@94
|
239 }
|
c@94
|
240
|
c@94
|
241 return buffer;
|
c@94
|
242 }
|
c@94
|
243
|
c@94
|
244 private:
|
c@134
|
245 LogCallback *m_logger;
|
c@94
|
246 MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently)
|
c@94
|
247 QProcess *m_process; // I own this
|
c@100
|
248 QMutex m_mutex;
|
c@126
|
249 bool m_crashed;
|
c@134
|
250
|
c@134
|
251 void log(std::string message) const {
|
c@134
|
252 if (m_logger) m_logger->log(message);
|
c@134
|
253 else std::cerr << message << std::endl;
|
c@134
|
254 }
|
c@94
|
255 };
|
c@94
|
256
|
c@94
|
257 }
|
c@94
|
258 }
|
c@94
|
259
|
c@94
|
260 #endif
|