cannam@111
|
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
|
c@118
|
2 /*
|
c@118
|
3 Piper C++
|
c@118
|
4
|
c@118
|
5 An API for audio analysis and feature extraction plugins.
|
c@118
|
6
|
c@118
|
7 Centre for Digital Music, Queen Mary, University of London.
|
c@118
|
8 Copyright 2006-2016 Chris Cannam and QMUL.
|
c@118
|
9
|
c@118
|
10 Permission is hereby granted, free of charge, to any person
|
c@118
|
11 obtaining a copy of this software and associated documentation
|
c@118
|
12 files (the "Software"), to deal in the Software without
|
c@118
|
13 restriction, including without limitation the rights to use, copy,
|
c@118
|
14 modify, merge, publish, distribute, sublicense, and/or sell copies
|
c@118
|
15 of the Software, and to permit persons to whom the Software is
|
c@118
|
16 furnished to do so, subject to the following conditions:
|
c@118
|
17
|
c@118
|
18 The above copyright notice and this permission notice shall be
|
c@118
|
19 included in all copies or substantial portions of the Software.
|
c@118
|
20
|
c@118
|
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
c@118
|
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
c@118
|
23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
c@118
|
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
|
c@118
|
25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
|
c@118
|
26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
c@118
|
27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
c@118
|
28
|
c@118
|
29 Except as contained in this notice, the names of the Centre for
|
c@118
|
30 Digital Music; Queen Mary, University of London; and Chris Cannam
|
c@118
|
31 shall not be used in advertising or otherwise to promote the sale,
|
c@118
|
32 use or other dealings in this Software without prior written
|
c@118
|
33 authorization.
|
c@118
|
34 */
|
c@96
|
35
|
c@96
|
36 #ifndef PIPER_CAPNP_CLIENT_H
|
c@96
|
37 #define PIPER_CAPNP_CLIENT_H
|
c@96
|
38
|
c@96
|
39 #include "Loader.h"
|
c@96
|
40 #include "PluginClient.h"
|
c@96
|
41 #include "PluginStub.h"
|
c@96
|
42 #include "SynchronousTransport.h"
|
c@96
|
43
|
c@96
|
44 #include "vamp-support/AssignedPluginHandleMapper.h"
|
c@96
|
45 #include "vamp-capnp/VampnProto.h"
|
c@96
|
46
|
c@96
|
47 #include <capnp/serialize.h>
|
c@96
|
48
|
c@97
|
49 namespace piper_vamp {
|
c@97
|
50 namespace client {
|
c@96
|
51
|
c@100
|
52 /**
|
c@100
|
53 * Client for a request-response Piper server, i.e. using the
|
c@100
|
54 * RpcRequest/RpcResponse structures with a single process call rather
|
c@100
|
55 * than having individual RPC methods, with a synchronous transport
|
c@100
|
56 * such as a subprocess pipe arrangement. Only one request can be
|
c@100
|
57 * handled at a time. This class is thread-safe if and only if it is
|
c@100
|
58 * constructed with a thread-safe SynchronousTransport implementation.
|
c@100
|
59 */
|
c@96
|
60 class CapnpRRClient : public PluginClient,
|
c@118
|
61 public Loader
|
c@96
|
62 {
|
c@96
|
63 // unsigned to avoid undefined behaviour on possible wrap
|
c@96
|
64 typedef uint32_t ReqId;
|
c@96
|
65
|
c@96
|
66 class CompletenessChecker : public MessageCompletenessChecker {
|
c@96
|
67 public:
|
c@96
|
68 bool isComplete(const std::vector<char> &message) const override {
|
c@96
|
69 auto karr = toKJArray(message);
|
c@96
|
70 size_t words = karr.size();
|
c@96
|
71 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
|
c@96
|
72 if (words > expected) {
|
c@96
|
73 std::cerr << "WARNING: obtained more data than expected ("
|
c@96
|
74 << words << " " << sizeof(capnp::word)
|
c@96
|
75 << "-byte words, expected "
|
c@96
|
76 << expected << ")" << std::endl;
|
c@96
|
77 }
|
c@96
|
78 return words >= expected;
|
c@96
|
79 }
|
c@96
|
80 };
|
c@96
|
81
|
c@96
|
82 public:
|
c@96
|
83 CapnpRRClient(SynchronousTransport *transport) : //!!! ownership? shared ptr?
|
c@96
|
84 m_transport(transport),
|
c@96
|
85 m_completenessChecker(new CompletenessChecker) {
|
c@96
|
86 transport->setCompletenessChecker(m_completenessChecker);
|
c@96
|
87 }
|
c@96
|
88
|
c@96
|
89 ~CapnpRRClient() {
|
c@96
|
90 delete m_completenessChecker;
|
c@96
|
91 }
|
c@96
|
92
|
c@96
|
93 //!!! obviously, factor out all repetitive guff
|
c@96
|
94
|
c@96
|
95 //!!! list and load are supposed to be called by application code,
|
c@96
|
96 //!!! but the rest are only supposed to be called by the plugin --
|
c@96
|
97 //!!! sort out the api here
|
c@96
|
98
|
c@96
|
99 // Loader methods:
|
c@96
|
100
|
c@97
|
101 ListResponse
|
c@96
|
102 listPluginData() override {
|
c@96
|
103
|
c@96
|
104 if (!m_transport->isOK()) {
|
c@126
|
105 throw std::runtime_error("Piper server crashed or failed to start");
|
c@96
|
106 }
|
c@96
|
107
|
c@96
|
108 capnp::MallocMessageBuilder message;
|
c@118
|
109 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
|
c@96
|
110 VampnProto::buildRpcRequest_List(builder);
|
c@96
|
111 ReqId id = getId();
|
c@96
|
112 builder.getId().setNumber(id);
|
c@96
|
113
|
c@126
|
114 auto karr = call(message, true);
|
c@96
|
115
|
c@96
|
116 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@97
|
117 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
|
c@96
|
118
|
c@97
|
119 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
|
c@96
|
120
|
c@97
|
121 ListResponse lr;
|
c@96
|
122 VampnProto::readListResponse(lr, reader.getResponse().getList());
|
c@96
|
123 return lr;
|
c@96
|
124 }
|
c@96
|
125
|
c@97
|
126 LoadResponse
|
c@97
|
127 loadPlugin(const LoadRequest &req) override {
|
c@96
|
128
|
c@96
|
129 if (!m_transport->isOK()) {
|
c@126
|
130 throw std::runtime_error("Piper server crashed or failed to start");
|
c@96
|
131 }
|
c@96
|
132
|
c@97
|
133 LoadResponse resp;
|
c@96
|
134 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
|
c@96
|
135 req.inputSampleRate,
|
c@96
|
136 req.adapterFlags,
|
c@96
|
137 resp.staticData,
|
c@96
|
138 resp.defaultConfiguration);
|
c@96
|
139
|
c@96
|
140 Vamp::Plugin *plugin = new PluginStub(this,
|
c@96
|
141 req.pluginKey,
|
c@96
|
142 req.inputSampleRate,
|
c@96
|
143 req.adapterFlags,
|
c@96
|
144 resp.staticData,
|
c@96
|
145 resp.defaultConfiguration);
|
c@96
|
146
|
c@96
|
147 m_mapper.addPlugin(handle, plugin);
|
c@96
|
148
|
c@96
|
149 resp.plugin = plugin;
|
c@96
|
150 return resp;
|
c@96
|
151 }
|
c@96
|
152
|
c@96
|
153 // PluginClient methods:
|
c@96
|
154
|
c@96
|
155 virtual
|
c@96
|
156 Vamp::Plugin::OutputList
|
c@96
|
157 configure(PluginStub *plugin,
|
c@97
|
158 PluginConfiguration config) override {
|
c@96
|
159
|
c@96
|
160 if (!m_transport->isOK()) {
|
c@126
|
161 throw std::runtime_error("Piper server crashed or failed to start");
|
c@96
|
162 }
|
c@96
|
163
|
c@97
|
164 ConfigurationRequest request;
|
c@96
|
165 request.plugin = plugin;
|
c@96
|
166 request.configuration = config;
|
c@96
|
167
|
c@96
|
168 capnp::MallocMessageBuilder message;
|
c@97
|
169 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
|
c@96
|
170
|
c@96
|
171 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
|
c@96
|
172 ReqId id = getId();
|
c@96
|
173 builder.getId().setNumber(id);
|
c@96
|
174
|
c@126
|
175 auto karr = call(message, true);
|
c@96
|
176
|
c@96
|
177 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@97
|
178 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
|
c@96
|
179
|
c@96
|
180 //!!! handle (explicit) error case
|
c@96
|
181
|
c@97
|
182 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
|
c@96
|
183
|
c@97
|
184 ConfigurationResponse cr;
|
c@96
|
185 VampnProto::readConfigurationResponse(cr,
|
c@96
|
186 reader.getResponse().getConfigure(),
|
c@96
|
187 m_mapper);
|
c@96
|
188
|
c@96
|
189 return cr.outputs;
|
c@96
|
190 };
|
c@96
|
191
|
c@96
|
192 virtual
|
c@96
|
193 Vamp::Plugin::FeatureSet
|
c@96
|
194 process(PluginStub *plugin,
|
c@96
|
195 std::vector<std::vector<float> > inputBuffers,
|
c@96
|
196 Vamp::RealTime timestamp) override {
|
c@96
|
197
|
c@96
|
198 if (!m_transport->isOK()) {
|
c@126
|
199 throw std::runtime_error("Piper server crashed or failed to start");
|
c@96
|
200 }
|
c@96
|
201
|
c@97
|
202 ProcessRequest request;
|
c@96
|
203 request.plugin = plugin;
|
c@96
|
204 request.inputBuffers = inputBuffers;
|
c@96
|
205 request.timestamp = timestamp;
|
c@96
|
206
|
c@96
|
207 capnp::MallocMessageBuilder message;
|
c@97
|
208 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
|
c@96
|
209 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
|
c@118
|
210 ReqId id = getId();
|
c@96
|
211 builder.getId().setNumber(id);
|
c@96
|
212
|
c@126
|
213 auto karr = call(message, false);
|
c@96
|
214
|
c@96
|
215 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@97
|
216 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
|
c@96
|
217
|
c@96
|
218 //!!! handle (explicit) error case
|
c@96
|
219
|
c@97
|
220 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
|
c@96
|
221
|
c@97
|
222 ProcessResponse pr;
|
c@96
|
223 VampnProto::readProcessResponse(pr,
|
c@96
|
224 reader.getResponse().getProcess(),
|
c@96
|
225 m_mapper);
|
c@96
|
226
|
c@96
|
227 return pr.features;
|
c@96
|
228 }
|
c@96
|
229
|
c@96
|
230 virtual Vamp::Plugin::FeatureSet
|
c@96
|
231 finish(PluginStub *plugin) override {
|
c@96
|
232
|
c@96
|
233 if (!m_transport->isOK()) {
|
c@126
|
234 throw std::runtime_error("Piper server crashed or failed to start");
|
c@96
|
235 }
|
c@96
|
236
|
c@97
|
237 FinishRequest request;
|
c@96
|
238 request.plugin = plugin;
|
c@96
|
239
|
c@96
|
240 capnp::MallocMessageBuilder message;
|
c@97
|
241 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
|
c@96
|
242
|
c@96
|
243 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
|
c@96
|
244 ReqId id = getId();
|
c@96
|
245 builder.getId().setNumber(id);
|
c@96
|
246
|
c@126
|
247 auto karr = call(message, true);
|
c@96
|
248
|
c@96
|
249 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@97
|
250 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
|
c@96
|
251
|
c@96
|
252 //!!! handle (explicit) error case
|
c@96
|
253
|
c@97
|
254 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
|
c@96
|
255
|
c@97
|
256 FinishResponse pr;
|
c@96
|
257 VampnProto::readFinishResponse(pr,
|
c@96
|
258 reader.getResponse().getFinish(),
|
c@96
|
259 m_mapper);
|
c@96
|
260
|
c@96
|
261 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
|
c@96
|
262
|
c@118
|
263 // Don't delete the plugin. It's the plugin that is supposed
|
c@118
|
264 // to be calling us here
|
c@96
|
265
|
c@96
|
266 return pr.features;
|
c@96
|
267 }
|
c@96
|
268
|
c@96
|
269 virtual void
|
c@96
|
270 reset(PluginStub *plugin,
|
c@97
|
271 PluginConfiguration config) override {
|
c@96
|
272
|
c@96
|
273 // Reload the plugin on the server side, and configure it as requested
|
c@96
|
274
|
c@96
|
275 if (!m_transport->isOK()) {
|
c@126
|
276 throw std::runtime_error("Piper server crashed or failed to start");
|
c@96
|
277 }
|
c@96
|
278
|
c@96
|
279 if (m_mapper.havePlugin(plugin)) {
|
c@96
|
280 (void)finish(plugin); // server-side unload
|
c@96
|
281 }
|
c@96
|
282
|
c@97
|
283 PluginStaticData psd;
|
c@97
|
284 PluginConfiguration defaultConfig;
|
c@96
|
285 PluginHandleMapper::Handle handle =
|
c@96
|
286 serverLoad(plugin->getPluginKey(),
|
c@96
|
287 plugin->getInputSampleRate(),
|
c@96
|
288 plugin->getAdapterFlags(),
|
c@96
|
289 psd, defaultConfig);
|
c@96
|
290
|
c@96
|
291 m_mapper.addPlugin(handle, plugin);
|
c@96
|
292
|
c@96
|
293 (void)configure(plugin, config);
|
c@96
|
294 }
|
c@96
|
295
|
c@96
|
296 private:
|
c@96
|
297 AssignedPluginHandleMapper m_mapper;
|
c@96
|
298 ReqId getId() {
|
c@96
|
299 //!!! todo: mutex
|
c@96
|
300 static ReqId m_nextId = 0;
|
c@96
|
301 return m_nextId++;
|
c@96
|
302 }
|
c@96
|
303
|
c@96
|
304 static
|
c@96
|
305 kj::Array<capnp::word>
|
c@96
|
306 toKJArray(const std::vector<char> &buffer) {
|
c@118
|
307 // We could do this whole thing with fewer copies, but let's
|
c@118
|
308 // see whether it matters first
|
c@96
|
309 size_t wordSize = sizeof(capnp::word);
|
c@118
|
310 size_t words = buffer.size() / wordSize;
|
c@118
|
311 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
|
c@118
|
312 memcpy(karr.begin(), buffer.data(), words * wordSize);
|
c@118
|
313 return karr;
|
c@96
|
314 }
|
c@96
|
315
|
c@96
|
316 void
|
c@97
|
317 checkResponseType(const piper::RpcResponse::Reader &r,
|
c@97
|
318 piper::RpcResponse::Response::Which type,
|
c@96
|
319 ReqId id) {
|
c@96
|
320
|
c@96
|
321 if (r.getResponse().which() != type) {
|
c@118
|
322 std::cerr << "checkResponseType: wrong response type (received "
|
c@118
|
323 << int(r.getResponse().which()) << ", expected "
|
c@118
|
324 << int(type) << ")"
|
c@118
|
325 << std::endl;
|
c@96
|
326 throw std::runtime_error("Wrong response type");
|
c@96
|
327 }
|
c@96
|
328 if (ReqId(r.getId().getNumber()) != id) {
|
c@118
|
329 std::cerr << "checkResponseType: wrong response id (received "
|
c@118
|
330 << r.getId().getNumber() << ", expected " << id << ")"
|
c@118
|
331 << std::endl;
|
c@96
|
332 throw std::runtime_error("Wrong response id");
|
c@96
|
333 }
|
c@96
|
334 }
|
c@96
|
335
|
c@96
|
336 kj::Array<capnp::word>
|
c@126
|
337 call(capnp::MallocMessageBuilder &message, bool slow) {
|
c@96
|
338 auto arr = capnp::messageToFlatArray(message);
|
c@96
|
339 auto responseBuffer = m_transport->call(arr.asChars().begin(),
|
c@126
|
340 arr.asChars().size(),
|
c@126
|
341 slow);
|
c@118
|
342 return toKJArray(responseBuffer);
|
c@96
|
343 }
|
c@96
|
344
|
c@96
|
345 PluginHandleMapper::Handle
|
c@96
|
346 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
|
c@97
|
347 PluginStaticData &psd,
|
c@97
|
348 PluginConfiguration &defaultConfig) {
|
c@96
|
349
|
c@97
|
350 LoadRequest request;
|
c@96
|
351 request.pluginKey = key;
|
c@96
|
352 request.inputSampleRate = inputSampleRate;
|
c@96
|
353 request.adapterFlags = adapterFlags;
|
c@96
|
354
|
c@96
|
355 capnp::MallocMessageBuilder message;
|
c@97
|
356 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
|
c@96
|
357
|
c@96
|
358 VampnProto::buildRpcRequest_Load(builder, request);
|
c@96
|
359 ReqId id = getId();
|
c@96
|
360 builder.getId().setNumber(id);
|
c@96
|
361
|
c@126
|
362 auto karr = call(message, false);
|
c@96
|
363
|
c@96
|
364 //!!! ... --> will also need some way to kill this process
|
c@96
|
365 //!!! (from another thread)
|
c@96
|
366
|
c@96
|
367 capnp::FlatArrayMessageReader responseMessage(karr);
|
c@97
|
368 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
|
c@96
|
369
|
c@96
|
370 //!!! handle (explicit) error case
|
c@96
|
371
|
c@97
|
372 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
|
c@96
|
373
|
c@97
|
374 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
|
c@96
|
375 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
|
c@96
|
376 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
|
c@96
|
377 return lr.getHandle();
|
c@96
|
378 };
|
c@96
|
379
|
c@96
|
380 private:
|
c@96
|
381 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
|
c@96
|
382 CompletenessChecker *m_completenessChecker; // I own this
|
c@96
|
383 };
|
c@96
|
384
|
c@96
|
385 }
|
c@96
|
386 }
|
c@96
|
387
|
c@96
|
388 #endif
|