comparison vamp-client/client.cpp @ 90:6429a99abcad

Split out classes
author Chris Cannam <c.cannam@qmul.ac.uk>
date Thu, 13 Oct 2016 10:17:59 +0100
parents 03ed2e0a6c8f
children 21f8af53eaf0
comparison
equal deleted inserted replaced
89:03ed2e0a6c8f 90:6429a99abcad
1 1
2 #include "stub.h" 2 #include "PiperStubPlugin.h"
3 3 #include "CapnpMessageCompletenessChecker.h"
4 #include "vamp-capnp/VampnProto.h" 4 #include "PipedQProcessTransport.h"
5 5 #include "PiperCapnpClient.h"
6 #include "vamp-support/AssignedPluginHandleMapper.h"
7
8 #include <QProcess>
9 6
10 #include <stdexcept> 7 #include <stdexcept>
11 8
12 using std::cerr; 9 using std::cerr;
13 using std::endl; 10 using std::endl;
14 11
15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so
16 // we're using pipes and the server is completely synchronous,
17 // handling only one call at once. Our PiperClient will fire off a
18 // QProcess and refer to its io device. Each request message is
19 // serialised into memory using capnp::MallocMessageBuilder and
20 // shunted into the process pipe; we then wait for some bytes to come
21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when
22 // a whole message is available, reading only that amount from the
23 // device and using FlatArrayMessageReader to convert to a response
24 // message. If the response message's id does not match the request
25 // message's, then the server has gone wrong (it should never be
26 // servicing more than one request at a time).
27
28 // Next level: Capnp RPC, but I want to get the first level to work
29 // first, not least because the server already exists.
30
31 namespace piper { //!!! probably something different
32
33 class PiperClient : public PiperClientStubRequirements
34 {
35 // unsigned to avoid undefined behaviour on possible wrap
36 typedef uint32_t ReqId;
37
38 public:
39 PiperClient() {
40 m_process = new QProcess();
41 m_process->setReadChannel(QProcess::StandardOutput);
42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
43 m_process->start("../bin/piper-vamp-server"); //!!!
44 if (!m_process->waitForStarted()) {
45 cerr << "server failed to start" << endl;
46 delete m_process;
47 m_process = 0;
48 }
49 }
50
51 ~PiperClient() {
52 if (m_process) {
53 if (m_process->state() != QProcess::NotRunning) {
54 m_process->closeWriteChannel();
55 m_process->waitForFinished(200);
56 m_process->close();
57 m_process->waitForFinished();
58 cerr << "server exited" << endl;
59 }
60 delete m_process;
61 }
62 }
63
64 //!!! obviously, factor out all repetitive guff
65
66 //!!! list and load are supposed to be called by application code,
67 //!!! but the rest are only supposed to be called by the plugin --
68 //!!! sort out the api here
69
70 Vamp::Plugin *
71 load(std::string key, float inputSampleRate, int adapterFlags) {
72
73 if (!m_process) {
74 throw std::runtime_error("Piper server failed to start");
75 }
76
77 Vamp::HostExt::LoadRequest request;
78 request.pluginKey = key;
79 request.inputSampleRate = inputSampleRate;
80 request.adapterFlags = adapterFlags;
81
82 capnp::MallocMessageBuilder message;
83 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
84
85 VampnProto::buildRpcRequest_Load(builder, request);
86 ReqId id = getId();
87 builder.getId().setNumber(id);
88
89 auto arr = messageToFlatArray(message);
90 m_process->write(arr.asChars().begin(), arr.asChars().size());
91
92 //!!! ... --> will also need some way to kill this process
93 //!!! (from another thread)
94
95 QByteArray buffer = readResponseBuffer();
96 auto karr = toKJArray(buffer);
97 capnp::FlatArrayMessageReader responseMessage(karr);
98 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
99
100 //!!! handle (explicit) error case
101
102 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
103
104 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
105
106 Vamp::HostExt::PluginStaticData psd;
107 Vamp::HostExt::PluginConfiguration defaultConfig;
108 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
109 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
110
111 Vamp::Plugin *plugin = new PiperStubPlugin(this,
112 inputSampleRate,
113 psd,
114 defaultConfig);
115
116 m_mapper.addPlugin(lr.getHandle(), plugin);
117
118 return plugin;
119 };
120
121 virtual
122 Vamp::Plugin::OutputList
123 configure(PiperStubPlugin *plugin,
124 Vamp::HostExt::PluginConfiguration config) {
125
126 if (!m_process) {
127 throw std::runtime_error("Piper server failed to start");
128 }
129
130 Vamp::HostExt::ConfigurationRequest request;
131 request.plugin = plugin;
132 request.configuration = config;
133
134 capnp::MallocMessageBuilder message;
135 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
136
137 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
138 ReqId id = getId();
139 builder.getId().setNumber(id);
140
141 auto arr = messageToFlatArray(message);
142 m_process->write(arr.asChars().begin(), arr.asChars().size());
143
144 QByteArray buffer = readResponseBuffer();
145 auto karr = toKJArray(buffer);
146 capnp::FlatArrayMessageReader responseMessage(karr);
147 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
148
149 //!!! handle (explicit) error case
150
151 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
152
153 Vamp::HostExt::ConfigurationResponse cr;
154 VampnProto::readConfigurationResponse(cr,
155 reader.getResponse().getConfigure(),
156 m_mapper);
157
158 return cr.outputs;
159 };
160
161 virtual
162 Vamp::Plugin::FeatureSet
163 process(PiperStubPlugin *plugin,
164 std::vector<std::vector<float> > inputBuffers,
165 Vamp::RealTime timestamp) {
166
167 if (!m_process) {
168 throw std::runtime_error("Piper server failed to start");
169 }
170
171 Vamp::HostExt::ProcessRequest request;
172 request.plugin = plugin;
173 request.inputBuffers = inputBuffers;
174 request.timestamp = timestamp;
175
176 capnp::MallocMessageBuilder message;
177 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
178
179 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
180 ReqId id = getId();
181 builder.getId().setNumber(id);
182
183 auto arr = messageToFlatArray(message);
184 m_process->write(arr.asChars().begin(), arr.asChars().size());
185
186 QByteArray buffer = readResponseBuffer();
187 auto karr = toKJArray(buffer);
188 capnp::FlatArrayMessageReader responseMessage(karr);
189 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
190
191 //!!! handle (explicit) error case
192
193 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
194
195 Vamp::HostExt::ProcessResponse pr;
196 VampnProto::readProcessResponse(pr,
197 reader.getResponse().getProcess(),
198 m_mapper);
199
200 return pr.features;
201 }
202
203 virtual Vamp::Plugin::FeatureSet
204 finish(PiperStubPlugin *plugin) {
205
206 if (!m_process) {
207 throw std::runtime_error("Piper server failed to start");
208 }
209
210 Vamp::HostExt::FinishRequest request;
211 request.plugin = plugin;
212
213 capnp::MallocMessageBuilder message;
214 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
215
216 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
217 ReqId id = getId();
218 builder.getId().setNumber(id);
219
220 auto arr = messageToFlatArray(message);
221 m_process->write(arr.asChars().begin(), arr.asChars().size());
222
223 QByteArray buffer = readResponseBuffer();
224 auto karr = toKJArray(buffer);
225 capnp::FlatArrayMessageReader responseMessage(karr);
226 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
227
228 //!!! handle (explicit) error case
229
230 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
231
232 Vamp::HostExt::ProcessResponse pr;
233 VampnProto::readFinishResponse(pr,
234 reader.getResponse().getFinish(),
235 m_mapper);
236
237 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
238
239 // Don't delete the plugin. It's the plugin that is supposed
240 // to be calling us here
241
242 return pr.features;
243 }
244
245 private:
246 QProcess *m_process;
247 AssignedPluginHandleMapper m_mapper;
248 ReqId getId() {
249 //!!! todo: mutex
250 static ReqId m_nextId = 0;
251 return m_nextId++;
252 }
253
254 kj::Array<capnp::word>
255 toKJArray(QByteArray qarr) {
256 // We could do this whole thing with fewer copies, but let's
257 // see whether it matters first
258 size_t wordSize = sizeof(capnp::word);
259 size_t words = qarr.size() / wordSize;
260 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
261 memcpy(karr.begin(), qarr.data(), words * wordSize);
262 return karr;
263 }
264
265 QByteArray
266 readResponseBuffer() {
267
268 QByteArray buffer;
269 size_t wordSize = sizeof(capnp::word);
270 bool complete = false;
271
272 while (!complete) {
273
274 m_process->waitForReadyRead(1000);
275 qint64 byteCount = m_process->bytesAvailable();
276 qint64 wordCount = byteCount / wordSize;
277
278 if (!wordCount) {
279 if (m_process->state() == QProcess::NotRunning) {
280 cerr << "ERROR: Subprocess exited: Load failed" << endl;
281 throw std::runtime_error("Piper server exited unexpectedly");
282 }
283 } else {
284 buffer.append(m_process->read(wordCount * wordSize));
285 size_t haveWords = buffer.size() / wordSize;
286 size_t expectedWords =
287 capnp::expectedSizeInWordsFromPrefix(toKJArray(buffer));
288 if (haveWords >= expectedWords) {
289 if (haveWords > expectedWords) {
290 cerr << "WARNING: obtained more data than expected ("
291 << haveWords << " words, expected " << expectedWords
292 << ")" << endl;
293 }
294 complete = true;
295 }
296 }
297 }
298 /*
299 cerr << "buffer = ";
300 for (int i = 0; i < buffer.size(); ++i) {
301 if (i % 16 == 0) cerr << "\n";
302 cerr << int(buffer[i]) << " ";
303 }
304 cerr << "\n";
305 */
306 return buffer;
307 }
308
309 void
310 checkResponseType(const RpcResponse::Reader &r,
311 RpcResponse::Response::Which type,
312 ReqId id) {
313
314 if (r.getResponse().which() != type) {
315 throw std::runtime_error("Wrong response type");
316 }
317 if (ReqId(r.getId().getNumber()) != id) {
318 throw std::runtime_error("Wrong response id");
319 }
320 }
321 };
322
323 }
324
325 int main(int, char **) 12 int main(int, char **)
326 { 13 {
327 piper::PiperClient client; 14 piper::CapnpMessageCompletenessChecker checker;
15 piper::PipedQProcessTransport transport("../bin/piper-vamp-server", &checker);
16 piper::PiperCapnpClient client(&transport);
17
328 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0); 18 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
329 if (!plugin->initialise(1, 4, 4)) { 19 if (!plugin->initialise(1, 4, 4)) {
330 cerr << "initialisation failed" << endl; 20 cerr << "initialisation failed" << endl;
331 } else { 21 } else {
332 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 }; 22 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };