Mercurial > hg > piper-cpp
comparison vamp-client/qt/ProcessQtTransport.h @ 150:bf8e3e7dd7de
Move some things around, and add overall test script
author | Chris Cannam <cannam@all-day-breakfast.com> |
---|---|
date | Fri, 20 Jan 2017 17:45:54 +0000 |
parents | |
children | 6ccb195d6de6 |
comparison
equal
deleted
inserted
replaced
149:70bf40743d6a | 150:bf8e3e7dd7de |
---|---|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ | |
2 /* | |
3 Piper C++ | |
4 | |
5 An API for audio analysis and feature extraction plugins. | |
6 | |
7 Centre for Digital Music, Queen Mary, University of London. | |
8 Copyright 2006-2016 Chris Cannam and QMUL. | |
9 | |
10 Permission is hereby granted, free of charge, to any person | |
11 obtaining a copy of this software and associated documentation | |
12 files (the "Software"), to deal in the Software without | |
13 restriction, including without limitation the rights to use, copy, | |
14 modify, merge, publish, distribute, sublicense, and/or sell copies | |
15 of the Software, and to permit persons to whom the Software is | |
16 furnished to do so, subject to the following conditions: | |
17 | |
18 The above copyright notice and this permission notice shall be | |
19 included in all copies or substantial portions of the Software. | |
20 | |
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR | |
25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF | |
26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | |
27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
28 | |
29 Except as contained in this notice, the names of the Centre for | |
30 Digital Music; Queen Mary, University of London; and Chris Cannam | |
31 shall not be used in advertising or otherwise to promote the sale, | |
32 use or other dealings in this Software without prior written | |
33 authorization. | |
34 */ | |
35 | |
36 #ifndef PIPER_PROCESS_QT_TRANSPORT_H | |
37 #define PIPER_PROCESS_QT_TRANSPORT_H | |
38 | |
39 #include "SynchronousTransport.h" | |
40 | |
41 #include <QProcess> | |
42 #include <QString> | |
43 #include <QMutex> | |
44 #include <QTime> | |
45 | |
46 #include <iostream> | |
47 | |
48 //#define DEBUG_TRANSPORT 1 | |
49 | |
50 namespace piper_vamp { | |
51 namespace client { | |
52 | |
53 /** | |
54 * A SynchronousTransport implementation that spawns a sub-process | |
55 * using Qt's QProcess abstraction and talks to it via stdin/stdout | |
56 * channels. Calls are completely serialized; the protocol only | |
57 * supports one call in process at a time, and therefore the transport | |
58 * only allows one at a time. | |
59 * | |
60 * This class is thread-safe, but in practice you can only use it from | |
61 * within a single thread, because the underlying QProcess does not | |
62 * support switching threads. | |
63 */ | |
64 class ProcessQtTransport : public SynchronousTransport | |
65 { | |
66 public: | |
67 ProcessQtTransport(std::string processName, | |
68 std::string formatArg, | |
69 LogCallback *logger) : // logger may be nullptr for cerr | |
70 m_logger(logger), | |
71 m_completenessChecker(0), | |
72 m_crashed(false) { | |
73 | |
74 m_process = new QProcess(); | |
75 m_process->setReadChannel(QProcess::StandardOutput); | |
76 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel); | |
77 | |
78 m_process->start(QString::fromStdString(processName), | |
79 { QString::fromStdString(formatArg) }); | |
80 | |
81 if (!m_process->waitForStarted()) { | |
82 if (m_process->state() == QProcess::NotRunning) { | |
83 QProcess::ProcessError err = m_process->error(); | |
84 if (err == QProcess::FailedToStart) { | |
85 log("Unable to start server process " + processName); | |
86 } else if (err == QProcess::Crashed) { | |
87 log("Server process " + processName + " crashed on startup"); | |
88 } else { | |
89 QString e = QString("%1").arg(err); | |
90 log("Server process " + processName + | |
91 " failed on startup with error code " + e.toStdString()); | |
92 } | |
93 delete m_process; | |
94 m_process = nullptr; | |
95 } | |
96 } | |
97 | |
98 if (m_process) { | |
99 log("Server process " + processName + " started OK"); | |
100 } | |
101 } | |
102 | |
103 ~ProcessQtTransport() { | |
104 if (m_process) { | |
105 if (m_process->state() != QProcess::NotRunning) { | |
106 m_process->closeWriteChannel(); | |
107 m_process->waitForFinished(200); | |
108 m_process->close(); | |
109 m_process->waitForFinished(); | |
110 log("Server process exited normally"); | |
111 } | |
112 delete m_process; | |
113 } | |
114 } | |
115 | |
116 void | |
117 setCompletenessChecker(MessageCompletenessChecker *checker) override { | |
118 m_completenessChecker = checker; | |
119 } | |
120 | |
121 bool | |
122 isOK() const override { | |
123 return (m_process != nullptr) && !m_crashed; | |
124 } | |
125 | |
126 std::vector<char> | |
127 call(const char *ptr, size_t size, std::string type, bool slow) override { | |
128 | |
129 QMutexLocker locker(&m_mutex); | |
130 | |
131 if (!m_completenessChecker) { | |
132 log("call: No completeness checker set on transport"); | |
133 throw std::logic_error("No completeness checker set on transport"); | |
134 } | |
135 if (!isOK()) { | |
136 log("call: Transport is not OK"); | |
137 throw std::logic_error("Transport is not OK"); | |
138 } | |
139 | |
140 #ifdef DEBUG_TRANSPORT | |
141 std::cerr << "writing " << size << " bytes to server" << std::endl; | |
142 #endif | |
143 m_process->write(ptr, size); | |
144 m_process->waitForBytesWritten(); | |
145 | |
146 std::vector<char> buffer; | |
147 bool complete = false; | |
148 | |
149 QTime t; | |
150 t.start(); | |
151 | |
152 // We don't like to timeout at all while waiting for a | |
153 // response -- we'd like to wait as long as the server | |
154 // continues running. | |
155 // | |
156 int beforeResponseTimeout = 0; // ms, 0 = no timeout | |
157 | |
158 // But if the call is marked as fast (i.e. just retrieving | |
159 // info rather than calculating something) we will time out | |
160 // after a bit. | |
161 // | |
162 if (!slow) beforeResponseTimeout = 10000; // ms, 0 = no timeout | |
163 | |
164 // But we do timeout if the server sends part of a reply and | |
165 // then gets stuck. It's reasonable to assume that a server | |
166 // that's already prepared its message and started sending has | |
167 // finished doing any real work. In each case the timeout is | |
168 // measured since data was last read. | |
169 // | |
170 int duringResponseTimeout = 5000; // ms, 0 = no timeout | |
171 | |
172 while (!complete) { | |
173 | |
174 bool responseStarted = !buffer.empty(); // already have something | |
175 int ms = t.elapsed(); // time since start or since last read | |
176 | |
177 qint64 byteCount = m_process->bytesAvailable(); | |
178 | |
179 if (!byteCount) { | |
180 | |
181 if (responseStarted) { | |
182 if (duringResponseTimeout > 0 && ms > duringResponseTimeout) { | |
183 log("Server timed out during response"); | |
184 throw std::runtime_error("Request timed out"); | |
185 } | |
186 } else { | |
187 if (beforeResponseTimeout > 0 && ms > beforeResponseTimeout) { | |
188 log("Server timed out before response"); | |
189 throw std::runtime_error("Request timed out"); | |
190 } | |
191 } | |
192 | |
193 #ifdef DEBUG_TRANSPORT | |
194 std::cerr << "waiting for data from server (slow = " << slow << ")..." << std::endl; | |
195 #endif | |
196 if (slow) { | |
197 m_process->waitForReadyRead(1000); | |
198 } else { | |
199 #ifdef _WIN32 | |
200 // This is most unsatisfactory -- if we give a non-zero | |
201 // arg here, then we end up sleeping way beyond the arrival | |
202 // of the data to read -- can end up using less than 10% | |
203 // CPU during processing which is crazy. So for Windows | |
204 // only, we busy-wait during "fast" calls. It works out | |
205 // much faster in the end. Could do with a simpler native | |
206 // blocking API really. | |
207 m_process->waitForReadyRead(0); | |
208 #else | |
209 m_process->waitForReadyRead(100); | |
210 #endif | |
211 } | |
212 if (m_process->state() == QProcess::NotRunning && | |
213 // don't give up until we've read all that's been buffered! | |
214 !m_process->bytesAvailable()) { | |
215 QProcess::ProcessError err = m_process->error(); | |
216 if (err == QProcess::Crashed) { | |
217 log("Server crashed during " + type + " request"); | |
218 } else { | |
219 QString e = QString("%1").arg(err); | |
220 log("Server failed during " + type | |
221 + " request with error code " + e.toStdString()); | |
222 } | |
223 m_crashed = true; | |
224 throw ServerCrashed(); | |
225 } | |
226 } else { | |
227 size_t formerSize = buffer.size(); | |
228 buffer.resize(formerSize + byteCount); | |
229 m_process->read(buffer.data() + formerSize, byteCount); | |
230 switch (m_completenessChecker->check(buffer)) { | |
231 case MessageCompletenessChecker::Complete: complete = true; break; | |
232 case MessageCompletenessChecker::Incomplete: break; | |
233 case MessageCompletenessChecker::Invalid: | |
234 throw std::runtime_error | |
235 ("Invalid message received: corrupt stream from server?"); | |
236 } | |
237 (void)t.restart(); // reset timeout when we read anything | |
238 } | |
239 } | |
240 | |
241 return buffer; | |
242 } | |
243 | |
244 private: | |
245 LogCallback *m_logger; | |
246 MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently) | |
247 QProcess *m_process; // I own this | |
248 QMutex m_mutex; | |
249 bool m_crashed; | |
250 | |
251 void log(std::string message) const { | |
252 if (m_logger) m_logger->log(message); | |
253 else std::cerr << message << std::endl; | |
254 } | |
255 }; | |
256 | |
257 } | |
258 } | |
259 | |
260 #endif |