c@78
|
1
|
c@78
|
2 #include "stub.h"
|
c@78
|
3
|
c@80
|
4 #include "vamp-capnp/VampnProto.h"
|
c@80
|
5
|
c@80
|
6 #include "vamp-support/AssignedPluginHandleMapper.h"
|
c@80
|
7
|
c@83
|
8 #include <QProcess>
|
c@83
|
9
|
c@83
|
10 #include <stdexcept>
|
c@83
|
11
|
c@83
|
12 using std::cerr;
|
c@83
|
13 using std::endl;
|
c@83
|
14
|
c@82
|
15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so
|
c@82
|
16 // we're using pipes and the server is completely synchronous,
|
c@82
|
17 // handling only one call at once. Our PiperClient will fire off a
|
c@82
|
18 // QProcess and refer to its io device. Each request message is
|
c@82
|
19 // serialised into memory using capnp::MallocMessageBuilder and
|
c@82
|
20 // shunted into the process pipe; we then wait for some bytes to come
|
c@82
|
21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when
|
c@82
|
22 // a whole message is available, reading only that amount from the
|
c@82
|
23 // device and using FlatArrayMessageReader to convert to a response
|
c@82
|
24 // message. If the response message's id does not match the request
|
c@82
|
25 // message's, then the server has gone wrong (it should never be
|
c@82
|
26 // servicing more than one request at a time).
|
c@82
|
27
|
c@82
|
28 // Next level: Capnp RPC, but I want to get the first level to work
|
c@83
|
29 // first, not least because the server already exists.
|
c@82
|
30
|
c@81
|
31 namespace piper { //!!! probably something different
|
c@80
|
32
|
c@88
|
33 class PiperClient : public PiperClientStubRequirements
|
c@80
|
34 {
|
c@81
|
35 // unsigned to avoid undefined behaviour on possible wrap
|
c@81
|
36 typedef uint32_t ReqId;
|
c@81
|
37
|
c@80
|
38 public:
|
c@83
|
39 PiperClient() {
|
c@83
|
40 m_process = new QProcess();
|
c@83
|
41 m_process->setReadChannel(QProcess::StandardOutput);
|
c@83
|
42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
|
c@83
|
43 m_process->start("../bin/piper-vamp-server"); //!!!
|
c@83
|
44 if (!m_process->waitForStarted()) {
|
c@83
|
45 cerr << "server failed to start" << endl;
|
c@83
|
46 delete m_process;
|
c@83
|
47 m_process = 0;
|
c@83
|
48 }
|
c@83
|
49 }
|
c@81
|
50
|
c@83
|
51 ~PiperClient() {
|
c@83
|
52 if (m_process) {
|
c@83
|
53 if (m_process->state() != QProcess::NotRunning) {
|
c@89
|
54 m_process->closeWriteChannel();
|
c@89
|
55 m_process->waitForFinished(200);
|
c@83
|
56 m_process->close();
|
c@83
|
57 m_process->waitForFinished();
|
c@89
|
58 cerr << "server exited" << endl;
|
c@83
|
59 }
|
c@83
|
60 delete m_process;
|
c@83
|
61 }
|
c@83
|
62 }
|
c@81
|
63
|
c@83
|
64 //!!! obviously, factor out all repetitive guff
|
c@84
|
65
|
c@88
|
66 //!!! list and load are supposed to be called by application code,
|
c@88
|
67 //!!! but the rest are only supposed to be called by the plugin --
|
c@88
|
68 //!!! sort out the api here
|
c@88
|
69
|
c@81
|
70 Vamp::Plugin *
|
c@81
|
71 load(std::string key, float inputSampleRate, int adapterFlags) {
|
c@81
|
72
|
c@83
|
73 if (!m_process) {
|
c@83
|
74 throw std::runtime_error("Piper server failed to start");
|
c@83
|
75 }
|
c@83
|
76
|
c@81
|
77 Vamp::HostExt::LoadRequest request;
|
c@81
|
78 request.pluginKey = key;
|
c@81
|
79 request.inputSampleRate = inputSampleRate;
|
c@81
|
80 request.adapterFlags = adapterFlags;
|
c@81
|
81
|
c@84
|
82 capnp::MallocMessageBuilder message;
|
c@81
|
83 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
|
c@81
|
84
|
c@81
|
85 VampnProto::buildRpcRequest_Load(builder, request);
|
c@81
|
86 ReqId id = getId();
|
c@81
|
87 builder.getId().setNumber(id);
|
c@83
|
88
|
c@83
|
89 auto arr = messageToFlatArray(message);
|
c@83
|
90 m_process->write(arr.asChars().begin(), arr.asChars().size());
|
c@83
|
91
|
c@84
|
92 //!!! ... --> will also need some way to kill this process
|
c@84
|
93 //!!! (from another thread)
|
c@84
|
94
|
c@84
|
95 QByteArray buffer = readResponseBuffer();
|
c@87
|
96 auto karr = toKJArray(buffer);
|
c@87
|
97 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@84
|
98 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
|
c@84
|
99
|
c@84
|
100 //!!! handle (explicit) error case
|
c@84
|
101
|
c@84
|
102 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
|
c@84
|
103
|
c@84
|
104 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
|
c@84
|
105
|
c@84
|
106 Vamp::HostExt::PluginStaticData psd;
|
c@84
|
107 Vamp::HostExt::PluginConfiguration defaultConfig;
|
c@84
|
108 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
|
c@84
|
109 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
|
c@84
|
110
|
c@84
|
111 Vamp::Plugin *plugin = new PiperStubPlugin(this,
|
c@84
|
112 inputSampleRate,
|
c@84
|
113 psd,
|
c@84
|
114 defaultConfig);
|
c@84
|
115
|
c@84
|
116 m_mapper.addPlugin(lr.getHandle(), plugin);
|
c@84
|
117
|
c@84
|
118 return plugin;
|
c@81
|
119 };
|
c@80
|
120
|
c@80
|
121 virtual
|
c@80
|
122 Vamp::Plugin::OutputList
|
c@80
|
123 configure(PiperStubPlugin *plugin,
|
c@80
|
124 Vamp::HostExt::PluginConfiguration config) {
|
c@80
|
125
|
c@83
|
126 if (!m_process) {
|
c@83
|
127 throw std::runtime_error("Piper server failed to start");
|
c@83
|
128 }
|
c@83
|
129
|
c@80
|
130 Vamp::HostExt::ConfigurationRequest request;
|
c@80
|
131 request.plugin = plugin;
|
c@80
|
132 request.configuration = config;
|
c@80
|
133
|
c@84
|
134 capnp::MallocMessageBuilder message;
|
c@80
|
135 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
|
c@80
|
136
|
c@80
|
137 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
|
c@81
|
138 ReqId id = getId();
|
c@81
|
139 builder.getId().setNumber(id);
|
c@84
|
140
|
c@84
|
141 auto arr = messageToFlatArray(message);
|
c@84
|
142 m_process->write(arr.asChars().begin(), arr.asChars().size());
|
c@84
|
143
|
c@84
|
144 QByteArray buffer = readResponseBuffer();
|
c@87
|
145 auto karr = toKJArray(buffer);
|
c@87
|
146 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@84
|
147 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
|
c@80
|
148
|
c@84
|
149 //!!! handle (explicit) error case
|
c@84
|
150
|
c@84
|
151 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
|
c@84
|
152
|
c@84
|
153 Vamp::HostExt::ConfigurationResponse cr;
|
c@84
|
154 VampnProto::readConfigurationResponse(cr,
|
c@84
|
155 reader.getResponse().getConfigure(),
|
c@84
|
156 m_mapper);
|
c@84
|
157
|
c@84
|
158 return cr.outputs;
|
c@81
|
159 };
|
c@80
|
160
|
c@80
|
161 virtual
|
c@80
|
162 Vamp::Plugin::FeatureSet
|
c@80
|
163 process(PiperStubPlugin *plugin,
|
c@84
|
164 std::vector<std::vector<float> > inputBuffers,
|
c@84
|
165 Vamp::RealTime timestamp) {
|
c@84
|
166
|
c@84
|
167 if (!m_process) {
|
c@84
|
168 throw std::runtime_error("Piper server failed to start");
|
c@84
|
169 }
|
c@84
|
170
|
c@84
|
171 Vamp::HostExt::ProcessRequest request;
|
c@84
|
172 request.plugin = plugin;
|
c@84
|
173 request.inputBuffers = inputBuffers;
|
c@84
|
174 request.timestamp = timestamp;
|
c@84
|
175
|
c@84
|
176 capnp::MallocMessageBuilder message;
|
c@84
|
177 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
|
c@84
|
178
|
c@84
|
179 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
|
c@84
|
180 ReqId id = getId();
|
c@84
|
181 builder.getId().setNumber(id);
|
c@84
|
182
|
c@84
|
183 auto arr = messageToFlatArray(message);
|
c@84
|
184 m_process->write(arr.asChars().begin(), arr.asChars().size());
|
c@84
|
185
|
c@84
|
186 QByteArray buffer = readResponseBuffer();
|
c@87
|
187 auto karr = toKJArray(buffer);
|
c@87
|
188 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@84
|
189 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
|
c@84
|
190
|
c@84
|
191 //!!! handle (explicit) error case
|
c@84
|
192
|
c@84
|
193 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
|
c@84
|
194
|
c@84
|
195 Vamp::HostExt::ProcessResponse pr;
|
c@84
|
196 VampnProto::readProcessResponse(pr,
|
c@84
|
197 reader.getResponse().getProcess(),
|
c@84
|
198 m_mapper);
|
c@84
|
199
|
c@84
|
200 return pr.features;
|
c@84
|
201 }
|
c@80
|
202
|
c@80
|
203 virtual Vamp::Plugin::FeatureSet
|
c@84
|
204 finish(PiperStubPlugin *plugin) {
|
c@84
|
205
|
c@84
|
206 if (!m_process) {
|
c@84
|
207 throw std::runtime_error("Piper server failed to start");
|
c@84
|
208 }
|
c@84
|
209
|
c@84
|
210 Vamp::HostExt::FinishRequest request;
|
c@84
|
211 request.plugin = plugin;
|
c@84
|
212
|
c@84
|
213 capnp::MallocMessageBuilder message;
|
c@84
|
214 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
|
c@84
|
215
|
c@84
|
216 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
|
c@84
|
217 ReqId id = getId();
|
c@84
|
218 builder.getId().setNumber(id);
|
c@84
|
219
|
c@84
|
220 auto arr = messageToFlatArray(message);
|
c@84
|
221 m_process->write(arr.asChars().begin(), arr.asChars().size());
|
c@84
|
222
|
c@84
|
223 QByteArray buffer = readResponseBuffer();
|
c@87
|
224 auto karr = toKJArray(buffer);
|
c@87
|
225 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@84
|
226 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
|
c@84
|
227
|
c@84
|
228 //!!! handle (explicit) error case
|
c@84
|
229
|
c@84
|
230 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
|
c@84
|
231
|
c@84
|
232 Vamp::HostExt::ProcessResponse pr;
|
c@84
|
233 VampnProto::readFinishResponse(pr,
|
c@84
|
234 reader.getResponse().getFinish(),
|
c@84
|
235 m_mapper);
|
c@84
|
236
|
c@84
|
237 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
|
c@88
|
238
|
c@88
|
239 // Don't delete the plugin. It's the plugin that is supposed
|
c@88
|
240 // to be calling us here
|
c@84
|
241
|
c@84
|
242 return pr.features;
|
c@84
|
243 }
|
c@80
|
244
|
c@80
|
245 private:
|
c@83
|
246 QProcess *m_process;
|
c@80
|
247 AssignedPluginHandleMapper m_mapper;
|
c@84
|
248 ReqId getId() {
|
c@81
|
249 //!!! todo: mutex
|
c@81
|
250 static ReqId m_nextId = 0;
|
c@81
|
251 return m_nextId++;
|
c@81
|
252 }
|
c@84
|
253
|
c@87
|
254 kj::Array<capnp::word>
|
c@87
|
255 toKJArray(QByteArray qarr) {
|
c@87
|
256 // We could do this whole thing with fewer copies, but let's
|
c@87
|
257 // see whether it matters first
|
c@84
|
258 size_t wordSize = sizeof(capnp::word);
|
c@87
|
259 size_t words = qarr.size() / wordSize;
|
c@87
|
260 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
|
c@87
|
261 memcpy(karr.begin(), qarr.data(), words * wordSize);
|
c@87
|
262 return karr;
|
c@84
|
263 }
|
c@84
|
264
|
c@84
|
265 QByteArray
|
c@84
|
266 readResponseBuffer() {
|
c@84
|
267
|
c@84
|
268 QByteArray buffer;
|
c@84
|
269 size_t wordSize = sizeof(capnp::word);
|
c@84
|
270 bool complete = false;
|
c@84
|
271
|
c@84
|
272 while (!complete) {
|
c@84
|
273
|
c@84
|
274 m_process->waitForReadyRead(1000);
|
c@84
|
275 qint64 byteCount = m_process->bytesAvailable();
|
c@84
|
276 qint64 wordCount = byteCount / wordSize;
|
c@84
|
277
|
c@84
|
278 if (!wordCount) {
|
c@84
|
279 if (m_process->state() == QProcess::NotRunning) {
|
c@84
|
280 cerr << "ERROR: Subprocess exited: Load failed" << endl;
|
c@84
|
281 throw std::runtime_error("Piper server exited unexpectedly");
|
c@84
|
282 }
|
c@84
|
283 } else {
|
c@84
|
284 buffer.append(m_process->read(wordCount * wordSize));
|
c@84
|
285 size_t haveWords = buffer.size() / wordSize;
|
c@84
|
286 size_t expectedWords =
|
c@87
|
287 capnp::expectedSizeInWordsFromPrefix(toKJArray(buffer));
|
c@84
|
288 if (haveWords >= expectedWords) {
|
c@84
|
289 if (haveWords > expectedWords) {
|
c@84
|
290 cerr << "WARNING: obtained more data than expected ("
|
c@84
|
291 << haveWords << " words, expected " << expectedWords
|
c@84
|
292 << ")" << endl;
|
c@84
|
293 }
|
c@84
|
294 complete = true;
|
c@84
|
295 }
|
c@84
|
296 }
|
c@84
|
297 }
|
c@87
|
298 /*
|
c@85
|
299 cerr << "buffer = ";
|
c@85
|
300 for (int i = 0; i < buffer.size(); ++i) {
|
c@85
|
301 if (i % 16 == 0) cerr << "\n";
|
c@85
|
302 cerr << int(buffer[i]) << " ";
|
c@85
|
303 }
|
c@85
|
304 cerr << "\n";
|
c@87
|
305 */
|
c@84
|
306 return buffer;
|
c@84
|
307 }
|
c@84
|
308
|
c@84
|
309 void
|
c@84
|
310 checkResponseType(const RpcResponse::Reader &r,
|
c@84
|
311 RpcResponse::Response::Which type,
|
c@84
|
312 ReqId id) {
|
c@84
|
313
|
c@84
|
314 if (r.getResponse().which() != type) {
|
c@84
|
315 throw std::runtime_error("Wrong response type");
|
c@84
|
316 }
|
c@84
|
317 if (ReqId(r.getId().getNumber()) != id) {
|
c@84
|
318 throw std::runtime_error("Wrong response id");
|
c@84
|
319 }
|
c@84
|
320 }
|
c@80
|
321 };
|
c@80
|
322
|
c@80
|
323 }
|
c@80
|
324
|
c@84
|
325 int main(int, char **)
|
c@84
|
326 {
|
c@84
|
327 piper::PiperClient client;
|
c@84
|
328 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
|
c@84
|
329 if (!plugin->initialise(1, 4, 4)) {
|
c@84
|
330 cerr << "initialisation failed" << endl;
|
c@84
|
331 } else {
|
c@84
|
332 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
|
c@84
|
333 float *bd = buf.data();
|
c@84
|
334 Vamp::Plugin::FeatureSet features = plugin->process
|
c@84
|
335 (&bd, Vamp::RealTime::zeroTime);
|
c@84
|
336 cerr << "results for output 0:" << endl;
|
c@84
|
337 auto fl(features[0]);
|
c@84
|
338 for (const auto &f: fl) {
|
c@84
|
339 cerr << f.values[0] << endl;
|
c@84
|
340 }
|
c@84
|
341 }
|
c@88
|
342
|
c@87
|
343 (void)plugin->getRemainingFeatures();
|
c@89
|
344
|
c@89
|
345 cerr << "calling reset..." << endl;
|
c@89
|
346 plugin->reset();
|
c@89
|
347 cerr << "...round 2!" << endl;
|
c@89
|
348
|
c@89
|
349 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
|
c@89
|
350 float *bd = buf.data();
|
c@89
|
351 Vamp::Plugin::FeatureSet features = plugin->process
|
c@89
|
352 (&bd, Vamp::RealTime::zeroTime);
|
c@89
|
353 cerr << "results for output 0:" << endl;
|
c@89
|
354 auto fl(features[0]);
|
c@89
|
355 for (const auto &f: fl) {
|
c@89
|
356 cerr << f.values[0] << endl;
|
c@89
|
357 }
|
c@89
|
358
|
c@89
|
359 (void)plugin->getRemainingFeatures();
|
c@89
|
360
|
c@88
|
361 delete plugin;
|
c@87
|
362 //!!! -- and also implement reset(), which will need to reconstruct internally
|
c@84
|
363 }
|
c@84
|
364
|