annotate vamp-client/CapnpRRClient.h @ 131:183fe1f03980

Update client to support list from
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 02 Nov 2016 18:39:51 +0000
parents 2004ec2b653e
children d04958b5d3ad
rev   line source
cannam@111 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
c@118 2 /*
c@118 3 Piper C++
c@118 4
c@118 5 An API for audio analysis and feature extraction plugins.
c@118 6
c@118 7 Centre for Digital Music, Queen Mary, University of London.
c@118 8 Copyright 2006-2016 Chris Cannam and QMUL.
c@118 9
c@118 10 Permission is hereby granted, free of charge, to any person
c@118 11 obtaining a copy of this software and associated documentation
c@118 12 files (the "Software"), to deal in the Software without
c@118 13 restriction, including without limitation the rights to use, copy,
c@118 14 modify, merge, publish, distribute, sublicense, and/or sell copies
c@118 15 of the Software, and to permit persons to whom the Software is
c@118 16 furnished to do so, subject to the following conditions:
c@118 17
c@118 18 The above copyright notice and this permission notice shall be
c@118 19 included in all copies or substantial portions of the Software.
c@118 20
c@118 21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
c@118 22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
c@118 23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
c@118 24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
c@118 25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
c@118 26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
c@118 27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
c@118 28
c@118 29 Except as contained in this notice, the names of the Centre for
c@118 30 Digital Music; Queen Mary, University of London; and Chris Cannam
c@118 31 shall not be used in advertising or otherwise to promote the sale,
c@118 32 use or other dealings in this Software without prior written
c@118 33 authorization.
c@118 34 */
c@96 35
c@96 36 #ifndef PIPER_CAPNP_CLIENT_H
c@96 37 #define PIPER_CAPNP_CLIENT_H
c@96 38
c@96 39 #include "Loader.h"
c@96 40 #include "PluginClient.h"
c@96 41 #include "PluginStub.h"
c@96 42 #include "SynchronousTransport.h"
c@96 43
c@96 44 #include "vamp-support/AssignedPluginHandleMapper.h"
c@96 45 #include "vamp-capnp/VampnProto.h"
c@96 46
c@96 47 #include <capnp/serialize.h>
c@96 48
c@97 49 namespace piper_vamp {
c@97 50 namespace client {
c@96 51
c@100 52 /**
c@100 53 * Client for a request-response Piper server, i.e. using the
c@100 54 * RpcRequest/RpcResponse structures with a single process call rather
c@100 55 * than having individual RPC methods, with a synchronous transport
c@100 56 * such as a subprocess pipe arrangement. Only one request can be
c@100 57 * handled at a time. This class is thread-safe if and only if it is
c@100 58 * constructed with a thread-safe SynchronousTransport implementation.
c@100 59 */
c@96 60 class CapnpRRClient : public PluginClient,
c@118 61 public Loader
c@96 62 {
c@96 63 // unsigned to avoid undefined behaviour on possible wrap
c@96 64 typedef uint32_t ReqId;
c@96 65
c@96 66 class CompletenessChecker : public MessageCompletenessChecker {
c@96 67 public:
c@96 68 bool isComplete(const std::vector<char> &message) const override {
c@96 69 auto karr = toKJArray(message);
c@96 70 size_t words = karr.size();
c@96 71 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
c@96 72 if (words > expected) {
c@96 73 std::cerr << "WARNING: obtained more data than expected ("
c@96 74 << words << " " << sizeof(capnp::word)
c@96 75 << "-byte words, expected "
c@96 76 << expected << ")" << std::endl;
c@96 77 }
c@96 78 return words >= expected;
c@96 79 }
c@96 80 };
c@96 81
c@96 82 public:
c@96 83 CapnpRRClient(SynchronousTransport *transport) : //!!! ownership? shared ptr?
c@96 84 m_transport(transport),
c@96 85 m_completenessChecker(new CompletenessChecker) {
c@96 86 transport->setCompletenessChecker(m_completenessChecker);
c@96 87 }
c@96 88
c@96 89 ~CapnpRRClient() {
c@96 90 delete m_completenessChecker;
c@96 91 }
c@96 92
c@96 93 //!!! obviously, factor out all repetitive guff
c@96 94
c@96 95 //!!! list and load are supposed to be called by application code,
c@96 96 //!!! but the rest are only supposed to be called by the plugin --
c@96 97 //!!! sort out the api here
c@96 98
c@96 99 // Loader methods:
c@96 100
c@97 101 ListResponse
c@131 102 listPluginData(std::vector<std::string> from) override {
c@96 103
c@96 104 if (!m_transport->isOK()) {
c@126 105 throw std::runtime_error("Piper server crashed or failed to start");
c@96 106 }
c@96 107
c@96 108 capnp::MallocMessageBuilder message;
c@118 109 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@131 110 ListRequest req;
c@131 111 req.from = from;
c@131 112 VampnProto::buildRpcRequest_List(builder, req);
c@96 113 ReqId id = getId();
c@96 114 builder.getId().setNumber(id);
c@96 115
c@126 116 auto karr = call(message, true);
c@96 117
c@96 118 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 119 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 120
c@97 121 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
c@96 122
c@97 123 ListResponse lr;
c@96 124 VampnProto::readListResponse(lr, reader.getResponse().getList());
c@96 125 return lr;
c@96 126 }
c@96 127
c@97 128 LoadResponse
c@97 129 loadPlugin(const LoadRequest &req) override {
c@96 130
c@96 131 if (!m_transport->isOK()) {
c@126 132 throw std::runtime_error("Piper server crashed or failed to start");
c@96 133 }
c@96 134
c@97 135 LoadResponse resp;
c@96 136 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
c@96 137 req.inputSampleRate,
c@96 138 req.adapterFlags,
c@96 139 resp.staticData,
c@96 140 resp.defaultConfiguration);
c@96 141
c@96 142 Vamp::Plugin *plugin = new PluginStub(this,
c@96 143 req.pluginKey,
c@96 144 req.inputSampleRate,
c@96 145 req.adapterFlags,
c@96 146 resp.staticData,
c@96 147 resp.defaultConfiguration);
c@96 148
c@96 149 m_mapper.addPlugin(handle, plugin);
c@96 150
c@96 151 resp.plugin = plugin;
c@96 152 return resp;
c@96 153 }
c@96 154
c@96 155 // PluginClient methods:
c@96 156
c@96 157 virtual
c@96 158 Vamp::Plugin::OutputList
c@96 159 configure(PluginStub *plugin,
c@97 160 PluginConfiguration config) override {
c@96 161
c@96 162 if (!m_transport->isOK()) {
c@126 163 throw std::runtime_error("Piper server crashed or failed to start");
c@96 164 }
c@96 165
c@97 166 ConfigurationRequest request;
c@96 167 request.plugin = plugin;
c@96 168 request.configuration = config;
c@96 169
c@96 170 capnp::MallocMessageBuilder message;
c@97 171 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 172
c@96 173 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@96 174 ReqId id = getId();
c@96 175 builder.getId().setNumber(id);
c@96 176
c@126 177 auto karr = call(message, true);
c@96 178
c@96 179 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 180 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 181
c@96 182 //!!! handle (explicit) error case
c@96 183
c@97 184 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
c@96 185
c@97 186 ConfigurationResponse cr;
c@96 187 VampnProto::readConfigurationResponse(cr,
c@96 188 reader.getResponse().getConfigure(),
c@96 189 m_mapper);
c@96 190
c@96 191 return cr.outputs;
c@96 192 };
c@96 193
c@96 194 virtual
c@96 195 Vamp::Plugin::FeatureSet
c@96 196 process(PluginStub *plugin,
c@96 197 std::vector<std::vector<float> > inputBuffers,
c@96 198 Vamp::RealTime timestamp) override {
c@96 199
c@96 200 if (!m_transport->isOK()) {
c@126 201 throw std::runtime_error("Piper server crashed or failed to start");
c@96 202 }
c@96 203
c@97 204 ProcessRequest request;
c@96 205 request.plugin = plugin;
c@96 206 request.inputBuffers = inputBuffers;
c@96 207 request.timestamp = timestamp;
c@96 208
c@96 209 capnp::MallocMessageBuilder message;
c@97 210 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 211 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@118 212 ReqId id = getId();
c@96 213 builder.getId().setNumber(id);
c@96 214
c@126 215 auto karr = call(message, false);
c@96 216
c@96 217 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 218 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 219
c@96 220 //!!! handle (explicit) error case
c@96 221
c@97 222 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
c@96 223
c@97 224 ProcessResponse pr;
c@96 225 VampnProto::readProcessResponse(pr,
c@96 226 reader.getResponse().getProcess(),
c@96 227 m_mapper);
c@96 228
c@96 229 return pr.features;
c@96 230 }
c@96 231
c@96 232 virtual Vamp::Plugin::FeatureSet
c@96 233 finish(PluginStub *plugin) override {
c@96 234
c@96 235 if (!m_transport->isOK()) {
c@126 236 throw std::runtime_error("Piper server crashed or failed to start");
c@96 237 }
c@96 238
c@97 239 FinishRequest request;
c@96 240 request.plugin = plugin;
c@96 241
c@96 242 capnp::MallocMessageBuilder message;
c@97 243 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 244
c@96 245 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@96 246 ReqId id = getId();
c@96 247 builder.getId().setNumber(id);
c@96 248
c@126 249 auto karr = call(message, true);
c@96 250
c@96 251 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 252 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 253
c@96 254 //!!! handle (explicit) error case
c@96 255
c@97 256 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
c@96 257
c@97 258 FinishResponse pr;
c@96 259 VampnProto::readFinishResponse(pr,
c@96 260 reader.getResponse().getFinish(),
c@96 261 m_mapper);
c@96 262
c@96 263 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@96 264
c@118 265 // Don't delete the plugin. It's the plugin that is supposed
c@118 266 // to be calling us here
c@96 267
c@96 268 return pr.features;
c@96 269 }
c@96 270
c@96 271 virtual void
c@96 272 reset(PluginStub *plugin,
c@97 273 PluginConfiguration config) override {
c@96 274
c@96 275 // Reload the plugin on the server side, and configure it as requested
c@96 276
c@96 277 if (!m_transport->isOK()) {
c@126 278 throw std::runtime_error("Piper server crashed or failed to start");
c@96 279 }
c@96 280
c@96 281 if (m_mapper.havePlugin(plugin)) {
c@96 282 (void)finish(plugin); // server-side unload
c@96 283 }
c@96 284
c@97 285 PluginStaticData psd;
c@97 286 PluginConfiguration defaultConfig;
c@96 287 PluginHandleMapper::Handle handle =
c@96 288 serverLoad(plugin->getPluginKey(),
c@96 289 plugin->getInputSampleRate(),
c@96 290 plugin->getAdapterFlags(),
c@96 291 psd, defaultConfig);
c@96 292
c@96 293 m_mapper.addPlugin(handle, plugin);
c@96 294
c@96 295 (void)configure(plugin, config);
c@96 296 }
c@96 297
c@96 298 private:
c@96 299 AssignedPluginHandleMapper m_mapper;
c@96 300 ReqId getId() {
c@96 301 //!!! todo: mutex
c@96 302 static ReqId m_nextId = 0;
c@96 303 return m_nextId++;
c@96 304 }
c@96 305
c@96 306 static
c@96 307 kj::Array<capnp::word>
c@96 308 toKJArray(const std::vector<char> &buffer) {
c@118 309 // We could do this whole thing with fewer copies, but let's
c@118 310 // see whether it matters first
c@96 311 size_t wordSize = sizeof(capnp::word);
c@118 312 size_t words = buffer.size() / wordSize;
c@118 313 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@118 314 memcpy(karr.begin(), buffer.data(), words * wordSize);
c@118 315 return karr;
c@96 316 }
c@96 317
c@96 318 void
c@97 319 checkResponseType(const piper::RpcResponse::Reader &r,
c@97 320 piper::RpcResponse::Response::Which type,
c@96 321 ReqId id) {
c@96 322
c@96 323 if (r.getResponse().which() != type) {
c@118 324 std::cerr << "checkResponseType: wrong response type (received "
c@118 325 << int(r.getResponse().which()) << ", expected "
c@118 326 << int(type) << ")"
c@118 327 << std::endl;
c@96 328 throw std::runtime_error("Wrong response type");
c@96 329 }
c@96 330 if (ReqId(r.getId().getNumber()) != id) {
c@118 331 std::cerr << "checkResponseType: wrong response id (received "
c@118 332 << r.getId().getNumber() << ", expected " << id << ")"
c@118 333 << std::endl;
c@96 334 throw std::runtime_error("Wrong response id");
c@96 335 }
c@96 336 }
c@96 337
c@96 338 kj::Array<capnp::word>
c@126 339 call(capnp::MallocMessageBuilder &message, bool slow) {
c@96 340 auto arr = capnp::messageToFlatArray(message);
c@96 341 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@126 342 arr.asChars().size(),
c@126 343 slow);
c@118 344 return toKJArray(responseBuffer);
c@96 345 }
c@96 346
c@96 347 PluginHandleMapper::Handle
c@96 348 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
c@97 349 PluginStaticData &psd,
c@97 350 PluginConfiguration &defaultConfig) {
c@96 351
c@97 352 LoadRequest request;
c@96 353 request.pluginKey = key;
c@96 354 request.inputSampleRate = inputSampleRate;
c@96 355 request.adapterFlags = adapterFlags;
c@96 356
c@96 357 capnp::MallocMessageBuilder message;
c@97 358 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 359
c@96 360 VampnProto::buildRpcRequest_Load(builder, request);
c@96 361 ReqId id = getId();
c@96 362 builder.getId().setNumber(id);
c@96 363
c@126 364 auto karr = call(message, false);
c@96 365
c@96 366 //!!! ... --> will also need some way to kill this process
c@96 367 //!!! (from another thread)
c@96 368
c@96 369 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 370 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 371
c@96 372 //!!! handle (explicit) error case
c@96 373
c@97 374 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
c@96 375
c@97 376 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@96 377 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@96 378 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@96 379 return lr.getHandle();
c@96 380 };
c@96 381
c@96 382 private:
c@96 383 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@96 384 CompletenessChecker *m_completenessChecker; // I own this
c@96 385 };
c@96 386
c@96 387 }
c@96 388 }
c@96 389
c@96 390 #endif