Mercurial > hg > piper-cpp
comparison vamp-client/client.cpp @ 90:6429a99abcad
Split out classes
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Thu, 13 Oct 2016 10:17:59 +0100 |
parents | 03ed2e0a6c8f |
children | 21f8af53eaf0 |
comparison
equal
deleted
inserted
replaced
89:03ed2e0a6c8f | 90:6429a99abcad |
---|---|
1 | 1 |
2 #include "stub.h" | 2 #include "PiperStubPlugin.h" |
3 | 3 #include "CapnpMessageCompletenessChecker.h" |
4 #include "vamp-capnp/VampnProto.h" | 4 #include "PipedQProcessTransport.h" |
5 | 5 #include "PiperCapnpClient.h" |
6 #include "vamp-support/AssignedPluginHandleMapper.h" | |
7 | |
8 #include <QProcess> | |
9 | 6 |
10 #include <stdexcept> | 7 #include <stdexcept> |
11 | 8 |
12 using std::cerr; | 9 using std::cerr; |
13 using std::endl; | 10 using std::endl; |
14 | 11 |
15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so | |
16 // we're using pipes and the server is completely synchronous, | |
17 // handling only one call at once. Our PiperClient will fire off a | |
18 // QProcess and refer to its io device. Each request message is | |
19 // serialised into memory using capnp::MallocMessageBuilder and | |
20 // shunted into the process pipe; we then wait for some bytes to come | |
21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when | |
22 // a whole message is available, reading only that amount from the | |
23 // device and using FlatArrayMessageReader to convert to a response | |
24 // message. If the response message's id does not match the request | |
25 // message's, then the server has gone wrong (it should never be | |
26 // servicing more than one request at a time). | |
27 | |
28 // Next level: Capnp RPC, but I want to get the first level to work | |
29 // first, not least because the server already exists. | |
30 | |
31 namespace piper { //!!! probably something different | |
32 | |
33 class PiperClient : public PiperClientStubRequirements | |
34 { | |
35 // unsigned to avoid undefined behaviour on possible wrap | |
36 typedef uint32_t ReqId; | |
37 | |
38 public: | |
39 PiperClient() { | |
40 m_process = new QProcess(); | |
41 m_process->setReadChannel(QProcess::StandardOutput); | |
42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel); | |
43 m_process->start("../bin/piper-vamp-server"); //!!! | |
44 if (!m_process->waitForStarted()) { | |
45 cerr << "server failed to start" << endl; | |
46 delete m_process; | |
47 m_process = 0; | |
48 } | |
49 } | |
50 | |
51 ~PiperClient() { | |
52 if (m_process) { | |
53 if (m_process->state() != QProcess::NotRunning) { | |
54 m_process->closeWriteChannel(); | |
55 m_process->waitForFinished(200); | |
56 m_process->close(); | |
57 m_process->waitForFinished(); | |
58 cerr << "server exited" << endl; | |
59 } | |
60 delete m_process; | |
61 } | |
62 } | |
63 | |
64 //!!! obviously, factor out all repetitive guff | |
65 | |
66 //!!! list and load are supposed to be called by application code, | |
67 //!!! but the rest are only supposed to be called by the plugin -- | |
68 //!!! sort out the api here | |
69 | |
70 Vamp::Plugin * | |
71 load(std::string key, float inputSampleRate, int adapterFlags) { | |
72 | |
73 if (!m_process) { | |
74 throw std::runtime_error("Piper server failed to start"); | |
75 } | |
76 | |
77 Vamp::HostExt::LoadRequest request; | |
78 request.pluginKey = key; | |
79 request.inputSampleRate = inputSampleRate; | |
80 request.adapterFlags = adapterFlags; | |
81 | |
82 capnp::MallocMessageBuilder message; | |
83 RpcRequest::Builder builder = message.initRoot<RpcRequest>(); | |
84 | |
85 VampnProto::buildRpcRequest_Load(builder, request); | |
86 ReqId id = getId(); | |
87 builder.getId().setNumber(id); | |
88 | |
89 auto arr = messageToFlatArray(message); | |
90 m_process->write(arr.asChars().begin(), arr.asChars().size()); | |
91 | |
92 //!!! ... --> will also need some way to kill this process | |
93 //!!! (from another thread) | |
94 | |
95 QByteArray buffer = readResponseBuffer(); | |
96 auto karr = toKJArray(buffer); | |
97 capnp::FlatArrayMessageReader responseMessage(karr); | |
98 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>(); | |
99 | |
100 //!!! handle (explicit) error case | |
101 | |
102 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id); | |
103 | |
104 const LoadResponse::Reader &lr = reader.getResponse().getLoad(); | |
105 | |
106 Vamp::HostExt::PluginStaticData psd; | |
107 Vamp::HostExt::PluginConfiguration defaultConfig; | |
108 VampnProto::readExtractorStaticData(psd, lr.getStaticData()); | |
109 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration()); | |
110 | |
111 Vamp::Plugin *plugin = new PiperStubPlugin(this, | |
112 inputSampleRate, | |
113 psd, | |
114 defaultConfig); | |
115 | |
116 m_mapper.addPlugin(lr.getHandle(), plugin); | |
117 | |
118 return plugin; | |
119 }; | |
120 | |
121 virtual | |
122 Vamp::Plugin::OutputList | |
123 configure(PiperStubPlugin *plugin, | |
124 Vamp::HostExt::PluginConfiguration config) { | |
125 | |
126 if (!m_process) { | |
127 throw std::runtime_error("Piper server failed to start"); | |
128 } | |
129 | |
130 Vamp::HostExt::ConfigurationRequest request; | |
131 request.plugin = plugin; | |
132 request.configuration = config; | |
133 | |
134 capnp::MallocMessageBuilder message; | |
135 RpcRequest::Builder builder = message.initRoot<RpcRequest>(); | |
136 | |
137 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper); | |
138 ReqId id = getId(); | |
139 builder.getId().setNumber(id); | |
140 | |
141 auto arr = messageToFlatArray(message); | |
142 m_process->write(arr.asChars().begin(), arr.asChars().size()); | |
143 | |
144 QByteArray buffer = readResponseBuffer(); | |
145 auto karr = toKJArray(buffer); | |
146 capnp::FlatArrayMessageReader responseMessage(karr); | |
147 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>(); | |
148 | |
149 //!!! handle (explicit) error case | |
150 | |
151 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id); | |
152 | |
153 Vamp::HostExt::ConfigurationResponse cr; | |
154 VampnProto::readConfigurationResponse(cr, | |
155 reader.getResponse().getConfigure(), | |
156 m_mapper); | |
157 | |
158 return cr.outputs; | |
159 }; | |
160 | |
161 virtual | |
162 Vamp::Plugin::FeatureSet | |
163 process(PiperStubPlugin *plugin, | |
164 std::vector<std::vector<float> > inputBuffers, | |
165 Vamp::RealTime timestamp) { | |
166 | |
167 if (!m_process) { | |
168 throw std::runtime_error("Piper server failed to start"); | |
169 } | |
170 | |
171 Vamp::HostExt::ProcessRequest request; | |
172 request.plugin = plugin; | |
173 request.inputBuffers = inputBuffers; | |
174 request.timestamp = timestamp; | |
175 | |
176 capnp::MallocMessageBuilder message; | |
177 RpcRequest::Builder builder = message.initRoot<RpcRequest>(); | |
178 | |
179 VampnProto::buildRpcRequest_Process(builder, request, m_mapper); | |
180 ReqId id = getId(); | |
181 builder.getId().setNumber(id); | |
182 | |
183 auto arr = messageToFlatArray(message); | |
184 m_process->write(arr.asChars().begin(), arr.asChars().size()); | |
185 | |
186 QByteArray buffer = readResponseBuffer(); | |
187 auto karr = toKJArray(buffer); | |
188 capnp::FlatArrayMessageReader responseMessage(karr); | |
189 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>(); | |
190 | |
191 //!!! handle (explicit) error case | |
192 | |
193 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id); | |
194 | |
195 Vamp::HostExt::ProcessResponse pr; | |
196 VampnProto::readProcessResponse(pr, | |
197 reader.getResponse().getProcess(), | |
198 m_mapper); | |
199 | |
200 return pr.features; | |
201 } | |
202 | |
203 virtual Vamp::Plugin::FeatureSet | |
204 finish(PiperStubPlugin *plugin) { | |
205 | |
206 if (!m_process) { | |
207 throw std::runtime_error("Piper server failed to start"); | |
208 } | |
209 | |
210 Vamp::HostExt::FinishRequest request; | |
211 request.plugin = plugin; | |
212 | |
213 capnp::MallocMessageBuilder message; | |
214 RpcRequest::Builder builder = message.initRoot<RpcRequest>(); | |
215 | |
216 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper); | |
217 ReqId id = getId(); | |
218 builder.getId().setNumber(id); | |
219 | |
220 auto arr = messageToFlatArray(message); | |
221 m_process->write(arr.asChars().begin(), arr.asChars().size()); | |
222 | |
223 QByteArray buffer = readResponseBuffer(); | |
224 auto karr = toKJArray(buffer); | |
225 capnp::FlatArrayMessageReader responseMessage(karr); | |
226 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>(); | |
227 | |
228 //!!! handle (explicit) error case | |
229 | |
230 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id); | |
231 | |
232 Vamp::HostExt::ProcessResponse pr; | |
233 VampnProto::readFinishResponse(pr, | |
234 reader.getResponse().getFinish(), | |
235 m_mapper); | |
236 | |
237 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin)); | |
238 | |
239 // Don't delete the plugin. It's the plugin that is supposed | |
240 // to be calling us here | |
241 | |
242 return pr.features; | |
243 } | |
244 | |
245 private: | |
246 QProcess *m_process; | |
247 AssignedPluginHandleMapper m_mapper; | |
248 ReqId getId() { | |
249 //!!! todo: mutex | |
250 static ReqId m_nextId = 0; | |
251 return m_nextId++; | |
252 } | |
253 | |
254 kj::Array<capnp::word> | |
255 toKJArray(QByteArray qarr) { | |
256 // We could do this whole thing with fewer copies, but let's | |
257 // see whether it matters first | |
258 size_t wordSize = sizeof(capnp::word); | |
259 size_t words = qarr.size() / wordSize; | |
260 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words)); | |
261 memcpy(karr.begin(), qarr.data(), words * wordSize); | |
262 return karr; | |
263 } | |
264 | |
265 QByteArray | |
266 readResponseBuffer() { | |
267 | |
268 QByteArray buffer; | |
269 size_t wordSize = sizeof(capnp::word); | |
270 bool complete = false; | |
271 | |
272 while (!complete) { | |
273 | |
274 m_process->waitForReadyRead(1000); | |
275 qint64 byteCount = m_process->bytesAvailable(); | |
276 qint64 wordCount = byteCount / wordSize; | |
277 | |
278 if (!wordCount) { | |
279 if (m_process->state() == QProcess::NotRunning) { | |
280 cerr << "ERROR: Subprocess exited: Load failed" << endl; | |
281 throw std::runtime_error("Piper server exited unexpectedly"); | |
282 } | |
283 } else { | |
284 buffer.append(m_process->read(wordCount * wordSize)); | |
285 size_t haveWords = buffer.size() / wordSize; | |
286 size_t expectedWords = | |
287 capnp::expectedSizeInWordsFromPrefix(toKJArray(buffer)); | |
288 if (haveWords >= expectedWords) { | |
289 if (haveWords > expectedWords) { | |
290 cerr << "WARNING: obtained more data than expected (" | |
291 << haveWords << " words, expected " << expectedWords | |
292 << ")" << endl; | |
293 } | |
294 complete = true; | |
295 } | |
296 } | |
297 } | |
298 /* | |
299 cerr << "buffer = "; | |
300 for (int i = 0; i < buffer.size(); ++i) { | |
301 if (i % 16 == 0) cerr << "\n"; | |
302 cerr << int(buffer[i]) << " "; | |
303 } | |
304 cerr << "\n"; | |
305 */ | |
306 return buffer; | |
307 } | |
308 | |
309 void | |
310 checkResponseType(const RpcResponse::Reader &r, | |
311 RpcResponse::Response::Which type, | |
312 ReqId id) { | |
313 | |
314 if (r.getResponse().which() != type) { | |
315 throw std::runtime_error("Wrong response type"); | |
316 } | |
317 if (ReqId(r.getId().getNumber()) != id) { | |
318 throw std::runtime_error("Wrong response id"); | |
319 } | |
320 } | |
321 }; | |
322 | |
323 } | |
324 | |
325 int main(int, char **) | 12 int main(int, char **) |
326 { | 13 { |
327 piper::PiperClient client; | 14 piper::CapnpMessageCompletenessChecker checker; |
15 piper::PipedQProcessTransport transport("../bin/piper-vamp-server", &checker); | |
16 piper::PiperCapnpClient client(&transport); | |
17 | |
328 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0); | 18 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0); |
329 if (!plugin->initialise(1, 4, 4)) { | 19 if (!plugin->initialise(1, 4, 4)) { |
330 cerr << "initialisation failed" << endl; | 20 cerr << "initialisation failed" << endl; |
331 } else { | 21 } else { |
332 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 }; | 22 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 }; |