annotate vamp-client/CapnpRRClient.h @ 218:ea8994465322

Rebuild these for capnp v0.6. But it would probably be better at this point not to commit them, as the main reason they are in the repo is because the compiler wasn't available for Visual Studio builds, and that's no longer the case.
author Chris Cannam <cannam@all-day-breakfast.com>
date Tue, 09 May 2017 11:46:23 +0100
parents df65480a08de
children 3db4c7998faf
rev   line source
cannam@111 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
c@118 2 /*
c@118 3 Piper C++
c@118 4
c@118 5 An API for audio analysis and feature extraction plugins.
c@118 6
c@118 7 Centre for Digital Music, Queen Mary, University of London.
c@118 8 Copyright 2006-2016 Chris Cannam and QMUL.
c@118 9
c@118 10 Permission is hereby granted, free of charge, to any person
c@118 11 obtaining a copy of this software and associated documentation
c@118 12 files (the "Software"), to deal in the Software without
c@118 13 restriction, including without limitation the rights to use, copy,
c@118 14 modify, merge, publish, distribute, sublicense, and/or sell copies
c@118 15 of the Software, and to permit persons to whom the Software is
c@118 16 furnished to do so, subject to the following conditions:
c@118 17
c@118 18 The above copyright notice and this permission notice shall be
c@118 19 included in all copies or substantial portions of the Software.
c@118 20
c@118 21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
c@118 22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
c@118 23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
c@118 24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
c@118 25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
c@118 26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
c@118 27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
c@118 28
c@118 29 Except as contained in this notice, the names of the Centre for
c@118 30 Digital Music; Queen Mary, University of London; and Chris Cannam
c@118 31 shall not be used in advertising or otherwise to promote the sale,
c@118 32 use or other dealings in this Software without prior written
c@118 33 authorization.
c@118 34 */
c@96 35
c@96 36 #ifndef PIPER_CAPNP_CLIENT_H
c@96 37 #define PIPER_CAPNP_CLIENT_H
c@96 38
c@96 39 #include "Loader.h"
c@96 40 #include "PluginClient.h"
cannam@208 41 #include "PiperVampPlugin.h"
c@96 42 #include "SynchronousTransport.h"
c@96 43
c@96 44 #include "vamp-support/AssignedPluginHandleMapper.h"
c@96 45 #include "vamp-capnp/VampnProto.h"
c@96 46
c@134 47 #include <sstream>
c@134 48
c@96 49 #include <capnp/serialize.h>
c@96 50
c@144 51 //#define LOG_ENTRYPOINTS 1
c@142 52
c@142 53 #ifdef LOG_ENTRYPOINTS
c@142 54 #define LOG_E(x) log(x)
c@142 55 #else
c@142 56 #define LOG_E(x)
c@142 57 #endif
c@142 58
c@97 59 namespace piper_vamp {
c@97 60 namespace client {
c@96 61
c@100 62 /**
c@100 63 * Client for a request-response Piper server, i.e. using the
c@100 64 * RpcRequest/RpcResponse structures with a single process call rather
c@100 65 * than having individual RPC methods, with a synchronous transport
c@100 66 * such as a subprocess pipe arrangement. Only one request can be
c@100 67 * handled at a time. This class is thread-safe if and only if it is
c@100 68 * constructed with a thread-safe SynchronousTransport implementation.
cannam@187 69 *
cannam@187 70 * This class takes Vamp-like structures (Plugin and the classes in
cannam@187 71 * vamp-support) and uses them to communicate with a Piper server
cannam@187 72 * using the Cap'n Proto serialisation of the Piper API. The transport
cannam@187 73 * layer (and thus the nature of the server) is defined by the
cannam@187 74 * SynchronousTransport passed to the constructor.
cannam@187 75 *
cannam@187 76 * This class implements both the Loader interface (which constructs
cannam@187 77 * PluginStub objects) and the PluginClient (which accepts PluginStubs
cannam@187 78 * and maps them into Piper handles).
c@100 79 */
c@96 80 class CapnpRRClient : public PluginClient,
c@118 81 public Loader
c@96 82 {
c@96 83 // unsigned to avoid undefined behaviour on possible wrap
c@96 84 typedef uint32_t ReqId;
c@96 85
c@96 86 class CompletenessChecker : public MessageCompletenessChecker {
c@96 87 public:
c@146 88 State check(const std::vector<char> &message) const override {
c@146 89
c@147 90 if (message.size() < sizeof(capnp::word)) {
c@147 91 return Incomplete;
c@147 92 }
c@147 93
c@96 94 auto karr = toKJArray(message);
c@96 95 size_t words = karr.size();
c@96 96 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
c@146 97
c@146 98 // Lacking a way to definitively check whether a message
c@146 99 // is valid or not, we would still like to trap obvious
c@146 100 // cases where a programming mistake results in garbage
c@146 101 // being returned from the server. We impose a limit on
c@146 102 // message size and, if a prefix is projected to exceed
c@146 103 // that limit, call it invalid. If an extractor wants to
c@146 104 // return a feature set greater than a gigaword in size,
c@146 105 // it'll just have to do it across multiple process calls.
c@146 106 size_t limit = size_t(1) << 30;
c@146 107
c@145 108 // cerr << "CompletenessChecker: message.size() = " << message.size()
c@146 109 // << ", words = " << words << ", limit = " << limit << ", expected = " << expected << endl;
c@146 110
c@96 111 if (words > expected) {
c@96 112 std::cerr << "WARNING: obtained more data than expected ("
c@96 113 << words << " " << sizeof(capnp::word)
c@96 114 << "-byte words, expected "
c@96 115 << expected << ")" << std::endl;
c@146 116 return Complete;
c@146 117 } else if (words == expected) {
c@146 118 return Complete;
c@146 119 } else if (expected > limit) {
c@147 120 std::cerr << "WARNING: apparently invalid message prefix: have "
c@147 121 << words << " words in prefix, projected message size is "
c@147 122 << expected << " against limit of " << limit << std::endl;
c@146 123 return Invalid;
c@146 124 } else {
c@146 125 return Incomplete;
c@96 126 }
c@96 127 }
c@96 128 };
c@96 129
c@96 130 public:
c@134 131 CapnpRRClient(SynchronousTransport *transport, //!!! ownership? shared ptr?
c@134 132 LogCallback *logger) : // logger may be nullptr for cerr
c@134 133 m_logger(logger),
c@96 134 m_transport(transport),
c@96 135 m_completenessChecker(new CompletenessChecker) {
c@96 136 transport->setCompletenessChecker(m_completenessChecker);
c@96 137 }
c@96 138
c@96 139 ~CapnpRRClient() {
c@96 140 delete m_completenessChecker;
c@96 141 }
c@96 142
c@96 143 //!!! obviously, factor out all repetitive guff
c@96 144
c@96 145 //!!! list and load are supposed to be called by application code,
c@96 146 //!!! but the rest are only supposed to be called by the plugin --
c@96 147 //!!! sort out the api here
c@96 148
c@96 149 // Loader methods:
c@96 150
c@97 151 ListResponse
cannam@207 152 list(const ListRequest &req) override {
c@96 153
c@142 154 LOG_E("CapnpRRClient::listPluginData called");
c@142 155
cannam@170 156 checkServerOK();
cannam@170 157
c@96 158 capnp::MallocMessageBuilder message;
c@118 159 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@131 160 VampnProto::buildRpcRequest_List(builder, req);
c@96 161 ReqId id = getId();
c@96 162 builder.getId().setNumber(id);
c@96 163
c@134 164 auto karr = call(message, "list", true);
c@96 165
c@96 166 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 167 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 168
c@97 169 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
c@96 170
c@97 171 ListResponse lr;
c@96 172 VampnProto::readListResponse(lr, reader.getResponse().getList());
c@142 173
c@142 174 LOG_E("CapnpRRClient::listPluginData returning");
c@142 175
c@96 176 return lr;
c@96 177 }
c@96 178
c@97 179 LoadResponse
cannam@207 180 load(const LoadRequest &req) override {
c@96 181
c@142 182 LOG_E("CapnpRRClient::loadPlugin called");
c@142 183
cannam@170 184 checkServerOK();
cannam@170 185
c@97 186 LoadResponse resp;
c@96 187 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
c@96 188 req.inputSampleRate,
c@96 189 req.adapterFlags,
c@96 190 resp.staticData,
c@96 191 resp.defaultConfiguration);
c@96 192
cannam@208 193 Vamp::Plugin *plugin = new PiperVampPlugin(this,
cannam@208 194 req.pluginKey,
cannam@208 195 req.inputSampleRate,
cannam@208 196 req.adapterFlags,
cannam@208 197 resp.staticData,
cannam@208 198 resp.defaultConfiguration);
c@96 199
c@96 200 m_mapper.addPlugin(handle, plugin);
c@96 201
c@96 202 resp.plugin = plugin;
c@142 203
c@142 204 LOG_E("CapnpRRClient::loadPlugin returning");
c@142 205
c@96 206 return resp;
c@96 207 }
c@96 208
c@96 209 // PluginClient methods:
c@96 210
c@96 211 virtual
cannam@185 212 ConfigurationResponse
cannam@208 213 configure(PiperVampPlugin *plugin,
c@97 214 PluginConfiguration config) override {
c@96 215
c@142 216 LOG_E("CapnpRRClient::configure called");
cannam@170 217
cannam@170 218 checkServerOK();
c@142 219
c@97 220 ConfigurationRequest request;
c@96 221 request.plugin = plugin;
c@96 222 request.configuration = config;
c@96 223
c@96 224 capnp::MallocMessageBuilder message;
c@97 225 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 226
c@96 227 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@96 228 ReqId id = getId();
c@96 229 builder.getId().setNumber(id);
c@96 230
c@134 231 auto karr = call(message, "configure", true);
c@96 232
c@96 233 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 234 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 235
c@97 236 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
c@96 237
c@97 238 ConfigurationResponse cr;
c@96 239 VampnProto::readConfigurationResponse(cr,
c@96 240 reader.getResponse().getConfigure(),
c@96 241 m_mapper);
c@96 242
c@142 243 LOG_E("CapnpRRClient::configure returning");
c@142 244
cannam@185 245 return cr;
c@96 246 };
c@96 247
c@96 248 virtual
c@96 249 Vamp::Plugin::FeatureSet
cannam@208 250 process(PiperVampPlugin *plugin,
c@96 251 std::vector<std::vector<float> > inputBuffers,
c@96 252 Vamp::RealTime timestamp) override {
c@96 253
c@142 254 LOG_E("CapnpRRClient::process called");
c@142 255
cannam@170 256 checkServerOK();
cannam@170 257
c@97 258 ProcessRequest request;
c@96 259 request.plugin = plugin;
c@96 260 request.inputBuffers = inputBuffers;
c@96 261 request.timestamp = timestamp;
c@96 262
c@96 263 capnp::MallocMessageBuilder message;
c@97 264 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 265 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@118 266 ReqId id = getId();
c@96 267 builder.getId().setNumber(id);
c@96 268
c@134 269 auto karr = call(message, "process", false);
c@96 270
c@96 271 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 272 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 273
c@97 274 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
c@96 275
c@97 276 ProcessResponse pr;
c@96 277 VampnProto::readProcessResponse(pr,
c@96 278 reader.getResponse().getProcess(),
c@96 279 m_mapper);
c@96 280
c@142 281 LOG_E("CapnpRRClient::process returning");
c@142 282
c@96 283 return pr.features;
c@96 284 }
c@96 285
c@96 286 virtual Vamp::Plugin::FeatureSet
cannam@208 287 finish(PiperVampPlugin *plugin) override {
c@96 288
c@142 289 LOG_E("CapnpRRClient::finish called");
c@142 290
cannam@170 291 checkServerOK();
cannam@170 292
c@97 293 FinishRequest request;
c@96 294 request.plugin = plugin;
c@96 295
c@96 296 capnp::MallocMessageBuilder message;
c@97 297 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 298
c@96 299 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@96 300 ReqId id = getId();
c@96 301 builder.getId().setNumber(id);
c@96 302
c@134 303 auto karr = call(message, "finish", true);
c@96 304
c@96 305 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 306 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 307
c@97 308 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
c@96 309
c@97 310 FinishResponse pr;
c@96 311 VampnProto::readFinishResponse(pr,
c@96 312 reader.getResponse().getFinish(),
c@96 313 m_mapper);
c@96 314
c@96 315 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@96 316
c@118 317 // Don't delete the plugin. It's the plugin that is supposed
c@118 318 // to be calling us here
c@96 319
c@142 320 LOG_E("CapnpRRClient::finish returning");
c@142 321
c@96 322 return pr.features;
c@96 323 }
c@96 324
c@96 325 virtual void
cannam@208 326 reset(PiperVampPlugin *plugin,
c@97 327 PluginConfiguration config) override {
c@96 328
c@96 329 // Reload the plugin on the server side, and configure it as requested
c@134 330
c@134 331 log("CapnpRRClient: reset() called, plugin will be closed and reloaded");
c@96 332
cannam@170 333 checkServerOK();
cannam@170 334
c@96 335 if (m_mapper.havePlugin(plugin)) {
c@96 336 (void)finish(plugin); // server-side unload
c@96 337 }
c@96 338
c@97 339 PluginStaticData psd;
c@97 340 PluginConfiguration defaultConfig;
c@96 341 PluginHandleMapper::Handle handle =
c@96 342 serverLoad(plugin->getPluginKey(),
c@96 343 plugin->getInputSampleRate(),
c@96 344 plugin->getAdapterFlags(),
c@96 345 psd, defaultConfig);
c@96 346
c@96 347 m_mapper.addPlugin(handle, plugin);
c@96 348
c@96 349 (void)configure(plugin, config);
c@96 350 }
c@96 351
c@96 352 private:
c@96 353 AssignedPluginHandleMapper m_mapper;
c@96 354 ReqId getId() {
c@96 355 //!!! todo: mutex
c@96 356 static ReqId m_nextId = 0;
c@96 357 return m_nextId++;
c@96 358 }
c@96 359
c@96 360 static
c@96 361 kj::Array<capnp::word>
c@96 362 toKJArray(const std::vector<char> &buffer) {
c@118 363 // We could do this whole thing with fewer copies, but let's
c@118 364 // see whether it matters first
c@96 365 size_t wordSize = sizeof(capnp::word);
c@118 366 size_t words = buffer.size() / wordSize;
c@118 367 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@118 368 memcpy(karr.begin(), buffer.data(), words * wordSize);
c@118 369 return karr;
c@96 370 }
c@96 371
c@96 372 void
cannam@170 373 checkServerOK() {
cannam@170 374 if (!m_transport->isOK()) {
cannam@170 375 log("Piper server crashed or failed to start (caller should have checked this)");
cannam@170 376 throw ServerCrashed();
cannam@170 377 }
cannam@170 378 }
cannam@170 379
cannam@170 380 /**
cannam@170 381 * Check (i) that the response has the same id as supplied (which
cannam@170 382 * presumably is the corresponding request id) and (ii) that the
cannam@170 383 * response has the expected type.
cannam@170 384 *
cannam@170 385 * Return only if both of these things are the case.
cannam@170 386 *
cannam@170 387 * If the response has the right id but is an error response,
cannam@170 388 * throw a ServiceError exception with the error response's
cannam@170 389 * message in it.
cannam@170 390 *
cannam@170 391 * If the response has the wrong id, or if it has the wrong type
cannam@170 392 * and is not an error response, throw ProtocolError. (i.e. for
cannam@170 393 * cases having errors that are not conveyed through our official
cannam@170 394 * error response.)
cannam@170 395 */
cannam@170 396 void
c@97 397 checkResponseType(const piper::RpcResponse::Reader &r,
c@97 398 piper::RpcResponse::Response::Which type,
c@96 399 ReqId id) {
c@96 400
c@96 401 if (ReqId(r.getId().getNumber()) != id) {
c@134 402 std::ostringstream s;
c@134 403 s << "checkResponseType: wrong response id (received "
c@134 404 << r.getId().getNumber() << ", expected " << id << ")";
c@134 405 log(s.str());
cannam@170 406 throw ProtocolError("Wrong response id");
cannam@170 407 }
cannam@170 408 if (r.getResponse().which() != type) {
cannam@170 409 if (r.getResponse().which() == piper::RpcResponse::Response::Which::ERROR) {
cannam@170 410 int code;
cannam@170 411 std::string message;
cannam@170 412 VampnProto::readRpcResponse_Error(code, message, r);
cannam@170 413 std::ostringstream s;
cannam@170 414 s << "checkResponseType: received an error with message: "
cannam@170 415 << message;
cannam@170 416 log(s.str());
cannam@170 417 throw ServiceError(message);
cannam@170 418 } else {
cannam@170 419 std::ostringstream s;
cannam@170 420 s << "checkResponseType: wrong response type (received "
cannam@170 421 << int(r.getResponse().which()) << ", expected " << int(type) << ")";
cannam@170 422 log(s.str());
cannam@170 423 throw ProtocolError("Wrong response type");
cannam@170 424 }
c@96 425 }
c@96 426 }
c@96 427
c@96 428 kj::Array<capnp::word>
c@134 429 call(capnp::MallocMessageBuilder &message, std::string type, bool slow) {
c@96 430 auto arr = capnp::messageToFlatArray(message);
c@96 431 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@126 432 arr.asChars().size(),
c@134 433 type,
c@126 434 slow);
c@118 435 return toKJArray(responseBuffer);
c@96 436 }
c@96 437
c@96 438 PluginHandleMapper::Handle
c@96 439 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
c@97 440 PluginStaticData &psd,
c@97 441 PluginConfiguration &defaultConfig) {
c@96 442
c@97 443 LoadRequest request;
c@96 444 request.pluginKey = key;
c@96 445 request.inputSampleRate = inputSampleRate;
c@96 446 request.adapterFlags = adapterFlags;
c@96 447
c@96 448 capnp::MallocMessageBuilder message;
c@97 449 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 450
c@96 451 VampnProto::buildRpcRequest_Load(builder, request);
c@96 452 ReqId id = getId();
c@96 453 builder.getId().setNumber(id);
c@96 454
c@134 455 auto karr = call(message, "load", false);
c@96 456
c@96 457 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 458 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 459
c@97 460 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
c@96 461
c@97 462 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@96 463 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@96 464 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@96 465 return lr.getHandle();
c@96 466 };
c@96 467
c@96 468 private:
c@134 469 LogCallback *m_logger;
c@96 470 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@96 471 CompletenessChecker *m_completenessChecker; // I own this
c@134 472
c@134 473 void log(std::string message) const {
c@134 474 if (m_logger) m_logger->log(message);
c@134 475 else std::cerr << message << std::endl;
c@134 476 }
c@96 477 };
c@96 478
c@96 479 }
c@96 480 }
c@96 481
c@96 482 #endif