comparison vamp-client/qt/ProcessQtTransport.h @ 150:bf8e3e7dd7de

Move some things around, and add overall test script
author Chris Cannam <cannam@all-day-breakfast.com>
date Fri, 20 Jan 2017 17:45:54 +0000
parents
children 6ccb195d6de6
comparison
equal deleted inserted replaced
149:70bf40743d6a 150:bf8e3e7dd7de
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
2 /*
3 Piper C++
4
5 An API for audio analysis and feature extraction plugins.
6
7 Centre for Digital Music, Queen Mary, University of London.
8 Copyright 2006-2016 Chris Cannam and QMUL.
9
10 Permission is hereby granted, free of charge, to any person
11 obtaining a copy of this software and associated documentation
12 files (the "Software"), to deal in the Software without
13 restriction, including without limitation the rights to use, copy,
14 modify, merge, publish, distribute, sublicense, and/or sell copies
15 of the Software, and to permit persons to whom the Software is
16 furnished to do so, subject to the following conditions:
17
18 The above copyright notice and this permission notice shall be
19 included in all copies or substantial portions of the Software.
20
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28
29 Except as contained in this notice, the names of the Centre for
30 Digital Music; Queen Mary, University of London; and Chris Cannam
31 shall not be used in advertising or otherwise to promote the sale,
32 use or other dealings in this Software without prior written
33 authorization.
34 */
35
36 #ifndef PIPER_PROCESS_QT_TRANSPORT_H
37 #define PIPER_PROCESS_QT_TRANSPORT_H
38
39 #include "SynchronousTransport.h"
40
41 #include <QProcess>
42 #include <QString>
43 #include <QMutex>
44 #include <QTime>
45
46 #include <iostream>
47
48 //#define DEBUG_TRANSPORT 1
49
50 namespace piper_vamp {
51 namespace client {
52
53 /**
54 * A SynchronousTransport implementation that spawns a sub-process
55 * using Qt's QProcess abstraction and talks to it via stdin/stdout
56 * channels. Calls are completely serialized; the protocol only
57 * supports one call in process at a time, and therefore the transport
58 * only allows one at a time.
59 *
60 * This class is thread-safe, but in practice you can only use it from
61 * within a single thread, because the underlying QProcess does not
62 * support switching threads.
63 */
64 class ProcessQtTransport : public SynchronousTransport
65 {
66 public:
67 ProcessQtTransport(std::string processName,
68 std::string formatArg,
69 LogCallback *logger) : // logger may be nullptr for cerr
70 m_logger(logger),
71 m_completenessChecker(0),
72 m_crashed(false) {
73
74 m_process = new QProcess();
75 m_process->setReadChannel(QProcess::StandardOutput);
76 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
77
78 m_process->start(QString::fromStdString(processName),
79 { QString::fromStdString(formatArg) });
80
81 if (!m_process->waitForStarted()) {
82 if (m_process->state() == QProcess::NotRunning) {
83 QProcess::ProcessError err = m_process->error();
84 if (err == QProcess::FailedToStart) {
85 log("Unable to start server process " + processName);
86 } else if (err == QProcess::Crashed) {
87 log("Server process " + processName + " crashed on startup");
88 } else {
89 QString e = QString("%1").arg(err);
90 log("Server process " + processName +
91 " failed on startup with error code " + e.toStdString());
92 }
93 delete m_process;
94 m_process = nullptr;
95 }
96 }
97
98 if (m_process) {
99 log("Server process " + processName + " started OK");
100 }
101 }
102
103 ~ProcessQtTransport() {
104 if (m_process) {
105 if (m_process->state() != QProcess::NotRunning) {
106 m_process->closeWriteChannel();
107 m_process->waitForFinished(200);
108 m_process->close();
109 m_process->waitForFinished();
110 log("Server process exited normally");
111 }
112 delete m_process;
113 }
114 }
115
116 void
117 setCompletenessChecker(MessageCompletenessChecker *checker) override {
118 m_completenessChecker = checker;
119 }
120
121 bool
122 isOK() const override {
123 return (m_process != nullptr) && !m_crashed;
124 }
125
126 std::vector<char>
127 call(const char *ptr, size_t size, std::string type, bool slow) override {
128
129 QMutexLocker locker(&m_mutex);
130
131 if (!m_completenessChecker) {
132 log("call: No completeness checker set on transport");
133 throw std::logic_error("No completeness checker set on transport");
134 }
135 if (!isOK()) {
136 log("call: Transport is not OK");
137 throw std::logic_error("Transport is not OK");
138 }
139
140 #ifdef DEBUG_TRANSPORT
141 std::cerr << "writing " << size << " bytes to server" << std::endl;
142 #endif
143 m_process->write(ptr, size);
144 m_process->waitForBytesWritten();
145
146 std::vector<char> buffer;
147 bool complete = false;
148
149 QTime t;
150 t.start();
151
152 // We don't like to timeout at all while waiting for a
153 // response -- we'd like to wait as long as the server
154 // continues running.
155 //
156 int beforeResponseTimeout = 0; // ms, 0 = no timeout
157
158 // But if the call is marked as fast (i.e. just retrieving
159 // info rather than calculating something) we will time out
160 // after a bit.
161 //
162 if (!slow) beforeResponseTimeout = 10000; // ms, 0 = no timeout
163
164 // But we do timeout if the server sends part of a reply and
165 // then gets stuck. It's reasonable to assume that a server
166 // that's already prepared its message and started sending has
167 // finished doing any real work. In each case the timeout is
168 // measured since data was last read.
169 //
170 int duringResponseTimeout = 5000; // ms, 0 = no timeout
171
172 while (!complete) {
173
174 bool responseStarted = !buffer.empty(); // already have something
175 int ms = t.elapsed(); // time since start or since last read
176
177 qint64 byteCount = m_process->bytesAvailable();
178
179 if (!byteCount) {
180
181 if (responseStarted) {
182 if (duringResponseTimeout > 0 && ms > duringResponseTimeout) {
183 log("Server timed out during response");
184 throw std::runtime_error("Request timed out");
185 }
186 } else {
187 if (beforeResponseTimeout > 0 && ms > beforeResponseTimeout) {
188 log("Server timed out before response");
189 throw std::runtime_error("Request timed out");
190 }
191 }
192
193 #ifdef DEBUG_TRANSPORT
194 std::cerr << "waiting for data from server (slow = " << slow << ")..." << std::endl;
195 #endif
196 if (slow) {
197 m_process->waitForReadyRead(1000);
198 } else {
199 #ifdef _WIN32
200 // This is most unsatisfactory -- if we give a non-zero
201 // arg here, then we end up sleeping way beyond the arrival
202 // of the data to read -- can end up using less than 10%
203 // CPU during processing which is crazy. So for Windows
204 // only, we busy-wait during "fast" calls. It works out
205 // much faster in the end. Could do with a simpler native
206 // blocking API really.
207 m_process->waitForReadyRead(0);
208 #else
209 m_process->waitForReadyRead(100);
210 #endif
211 }
212 if (m_process->state() == QProcess::NotRunning &&
213 // don't give up until we've read all that's been buffered!
214 !m_process->bytesAvailable()) {
215 QProcess::ProcessError err = m_process->error();
216 if (err == QProcess::Crashed) {
217 log("Server crashed during " + type + " request");
218 } else {
219 QString e = QString("%1").arg(err);
220 log("Server failed during " + type
221 + " request with error code " + e.toStdString());
222 }
223 m_crashed = true;
224 throw ServerCrashed();
225 }
226 } else {
227 size_t formerSize = buffer.size();
228 buffer.resize(formerSize + byteCount);
229 m_process->read(buffer.data() + formerSize, byteCount);
230 switch (m_completenessChecker->check(buffer)) {
231 case MessageCompletenessChecker::Complete: complete = true; break;
232 case MessageCompletenessChecker::Incomplete: break;
233 case MessageCompletenessChecker::Invalid:
234 throw std::runtime_error
235 ("Invalid message received: corrupt stream from server?");
236 }
237 (void)t.restart(); // reset timeout when we read anything
238 }
239 }
240
241 return buffer;
242 }
243
244 private:
245 LogCallback *m_logger;
246 MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently)
247 QProcess *m_process; // I own this
248 QMutex m_mutex;
249 bool m_crashed;
250
251 void log(std::string message) const {
252 if (m_logger) m_logger->log(message);
253 else std::cerr << message << std::endl;
254 }
255 };
256
257 }
258 }
259
260 #endif