annotate vamp-client/CapnpRRClient.h @ 186:52322dde68ea

Fix erroneous logic for handling step and block size in prior commit The earlier change had a logical misconception. If PluginStub is receiving the correct step and block size back from the configure call, the plugin on the server side must have already been successfully initialised, as the step and block size are only returned in a successful configure response. This means the test for a failed initialise and redo with the correct parameters must be done on the server side (in LoaderRequests) not the client. The client has a more complicated job, which is to notice that a *successful* configure had returned different framing parameters from those passed to the initialise call, and to pretend that it had actually failed until the host called again with the correct parameters. We definitely need tests for this!
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 06 Feb 2017 16:44:33 +0000
parents 3eb00e5c76c4
children ad6025dc0b04 61034472c304
rev   line source
cannam@111 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
c@118 2 /*
c@118 3 Piper C++
c@118 4
c@118 5 An API for audio analysis and feature extraction plugins.
c@118 6
c@118 7 Centre for Digital Music, Queen Mary, University of London.
c@118 8 Copyright 2006-2016 Chris Cannam and QMUL.
c@118 9
c@118 10 Permission is hereby granted, free of charge, to any person
c@118 11 obtaining a copy of this software and associated documentation
c@118 12 files (the "Software"), to deal in the Software without
c@118 13 restriction, including without limitation the rights to use, copy,
c@118 14 modify, merge, publish, distribute, sublicense, and/or sell copies
c@118 15 of the Software, and to permit persons to whom the Software is
c@118 16 furnished to do so, subject to the following conditions:
c@118 17
c@118 18 The above copyright notice and this permission notice shall be
c@118 19 included in all copies or substantial portions of the Software.
c@118 20
c@118 21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
c@118 22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
c@118 23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
c@118 24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
c@118 25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
c@118 26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
c@118 27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
c@118 28
c@118 29 Except as contained in this notice, the names of the Centre for
c@118 30 Digital Music; Queen Mary, University of London; and Chris Cannam
c@118 31 shall not be used in advertising or otherwise to promote the sale,
c@118 32 use or other dealings in this Software without prior written
c@118 33 authorization.
c@118 34 */
c@96 35
c@96 36 #ifndef PIPER_CAPNP_CLIENT_H
c@96 37 #define PIPER_CAPNP_CLIENT_H
c@96 38
c@96 39 #include "Loader.h"
c@96 40 #include "PluginClient.h"
c@96 41 #include "PluginStub.h"
c@96 42 #include "SynchronousTransport.h"
c@96 43
c@96 44 #include "vamp-support/AssignedPluginHandleMapper.h"
c@96 45 #include "vamp-capnp/VampnProto.h"
c@96 46
c@134 47 #include <sstream>
c@134 48
c@96 49 #include <capnp/serialize.h>
c@96 50
c@144 51 //#define LOG_ENTRYPOINTS 1
c@142 52
c@142 53 #ifdef LOG_ENTRYPOINTS
c@142 54 #define LOG_E(x) log(x)
c@142 55 #else
c@142 56 #define LOG_E(x)
c@142 57 #endif
c@142 58
c@97 59 namespace piper_vamp {
c@97 60 namespace client {
c@96 61
c@100 62 /**
c@100 63 * Client for a request-response Piper server, i.e. using the
c@100 64 * RpcRequest/RpcResponse structures with a single process call rather
c@100 65 * than having individual RPC methods, with a synchronous transport
c@100 66 * such as a subprocess pipe arrangement. Only one request can be
c@100 67 * handled at a time. This class is thread-safe if and only if it is
c@100 68 * constructed with a thread-safe SynchronousTransport implementation.
c@100 69 */
c@96 70 class CapnpRRClient : public PluginClient,
c@118 71 public Loader
c@96 72 {
c@96 73 // unsigned to avoid undefined behaviour on possible wrap
c@96 74 typedef uint32_t ReqId;
c@96 75
c@96 76 class CompletenessChecker : public MessageCompletenessChecker {
c@96 77 public:
c@146 78 State check(const std::vector<char> &message) const override {
c@146 79
c@147 80 if (message.size() < sizeof(capnp::word)) {
c@147 81 return Incomplete;
c@147 82 }
c@147 83
c@96 84 auto karr = toKJArray(message);
c@96 85 size_t words = karr.size();
c@96 86 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
c@146 87
c@146 88 // Lacking a way to definitively check whether a message
c@146 89 // is valid or not, we would still like to trap obvious
c@146 90 // cases where a programming mistake results in garbage
c@146 91 // being returned from the server. We impose a limit on
c@146 92 // message size and, if a prefix is projected to exceed
c@146 93 // that limit, call it invalid. If an extractor wants to
c@146 94 // return a feature set greater than a gigaword in size,
c@146 95 // it'll just have to do it across multiple process calls.
c@146 96 size_t limit = size_t(1) << 30;
c@146 97
c@145 98 // cerr << "CompletenessChecker: message.size() = " << message.size()
c@146 99 // << ", words = " << words << ", limit = " << limit << ", expected = " << expected << endl;
c@146 100
c@96 101 if (words > expected) {
c@96 102 std::cerr << "WARNING: obtained more data than expected ("
c@96 103 << words << " " << sizeof(capnp::word)
c@96 104 << "-byte words, expected "
c@96 105 << expected << ")" << std::endl;
c@146 106 return Complete;
c@146 107 } else if (words == expected) {
c@146 108 return Complete;
c@146 109 } else if (expected > limit) {
c@147 110 std::cerr << "WARNING: apparently invalid message prefix: have "
c@147 111 << words << " words in prefix, projected message size is "
c@147 112 << expected << " against limit of " << limit << std::endl;
c@146 113 return Invalid;
c@146 114 } else {
c@146 115 return Incomplete;
c@96 116 }
c@96 117 }
c@96 118 };
c@96 119
c@96 120 public:
c@134 121 CapnpRRClient(SynchronousTransport *transport, //!!! ownership? shared ptr?
c@134 122 LogCallback *logger) : // logger may be nullptr for cerr
c@134 123 m_logger(logger),
c@96 124 m_transport(transport),
c@96 125 m_completenessChecker(new CompletenessChecker) {
c@96 126 transport->setCompletenessChecker(m_completenessChecker);
c@96 127 }
c@96 128
c@96 129 ~CapnpRRClient() {
c@96 130 delete m_completenessChecker;
c@96 131 }
c@96 132
c@96 133 //!!! obviously, factor out all repetitive guff
c@96 134
c@96 135 //!!! list and load are supposed to be called by application code,
c@96 136 //!!! but the rest are only supposed to be called by the plugin --
c@96 137 //!!! sort out the api here
c@96 138
c@96 139 // Loader methods:
c@96 140
c@97 141 ListResponse
c@132 142 listPluginData(const ListRequest &req) override {
c@96 143
c@142 144 LOG_E("CapnpRRClient::listPluginData called");
c@142 145
cannam@170 146 checkServerOK();
cannam@170 147
c@96 148 capnp::MallocMessageBuilder message;
c@118 149 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@131 150 VampnProto::buildRpcRequest_List(builder, req);
c@96 151 ReqId id = getId();
c@96 152 builder.getId().setNumber(id);
c@96 153
c@134 154 auto karr = call(message, "list", true);
c@96 155
c@96 156 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 157 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 158
c@97 159 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
c@96 160
c@97 161 ListResponse lr;
c@96 162 VampnProto::readListResponse(lr, reader.getResponse().getList());
c@142 163
c@142 164 LOG_E("CapnpRRClient::listPluginData returning");
c@142 165
c@96 166 return lr;
c@96 167 }
c@96 168
c@97 169 LoadResponse
c@97 170 loadPlugin(const LoadRequest &req) override {
c@96 171
c@142 172 LOG_E("CapnpRRClient::loadPlugin called");
c@142 173
cannam@170 174 checkServerOK();
cannam@170 175
c@97 176 LoadResponse resp;
c@96 177 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
c@96 178 req.inputSampleRate,
c@96 179 req.adapterFlags,
c@96 180 resp.staticData,
c@96 181 resp.defaultConfiguration);
c@96 182
c@96 183 Vamp::Plugin *plugin = new PluginStub(this,
c@96 184 req.pluginKey,
c@96 185 req.inputSampleRate,
c@96 186 req.adapterFlags,
c@96 187 resp.staticData,
c@96 188 resp.defaultConfiguration);
c@96 189
c@96 190 m_mapper.addPlugin(handle, plugin);
c@96 191
c@96 192 resp.plugin = plugin;
c@142 193
c@142 194 LOG_E("CapnpRRClient::loadPlugin returning");
c@142 195
c@96 196 return resp;
c@96 197 }
c@96 198
c@96 199 // PluginClient methods:
c@96 200
c@96 201 virtual
cannam@185 202 ConfigurationResponse
c@96 203 configure(PluginStub *plugin,
c@97 204 PluginConfiguration config) override {
c@96 205
c@142 206 LOG_E("CapnpRRClient::configure called");
cannam@170 207
cannam@170 208 checkServerOK();
c@142 209
c@97 210 ConfigurationRequest request;
c@96 211 request.plugin = plugin;
c@96 212 request.configuration = config;
c@96 213
c@96 214 capnp::MallocMessageBuilder message;
c@97 215 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 216
c@96 217 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@96 218 ReqId id = getId();
c@96 219 builder.getId().setNumber(id);
c@96 220
c@134 221 auto karr = call(message, "configure", true);
c@96 222
c@96 223 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 224 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 225
c@97 226 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
c@96 227
c@97 228 ConfigurationResponse cr;
c@96 229 VampnProto::readConfigurationResponse(cr,
c@96 230 reader.getResponse().getConfigure(),
c@96 231 m_mapper);
c@96 232
c@142 233 LOG_E("CapnpRRClient::configure returning");
c@142 234
cannam@185 235 return cr;
c@96 236 };
c@96 237
c@96 238 virtual
c@96 239 Vamp::Plugin::FeatureSet
c@96 240 process(PluginStub *plugin,
c@96 241 std::vector<std::vector<float> > inputBuffers,
c@96 242 Vamp::RealTime timestamp) override {
c@96 243
c@142 244 LOG_E("CapnpRRClient::process called");
c@142 245
cannam@170 246 checkServerOK();
cannam@170 247
c@97 248 ProcessRequest request;
c@96 249 request.plugin = plugin;
c@96 250 request.inputBuffers = inputBuffers;
c@96 251 request.timestamp = timestamp;
c@96 252
c@96 253 capnp::MallocMessageBuilder message;
c@97 254 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 255 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@118 256 ReqId id = getId();
c@96 257 builder.getId().setNumber(id);
c@96 258
c@134 259 auto karr = call(message, "process", false);
c@96 260
c@96 261 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 262 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 263
c@97 264 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
c@96 265
c@97 266 ProcessResponse pr;
c@96 267 VampnProto::readProcessResponse(pr,
c@96 268 reader.getResponse().getProcess(),
c@96 269 m_mapper);
c@96 270
c@142 271 LOG_E("CapnpRRClient::process returning");
c@142 272
c@96 273 return pr.features;
c@96 274 }
c@96 275
c@96 276 virtual Vamp::Plugin::FeatureSet
c@96 277 finish(PluginStub *plugin) override {
c@96 278
c@142 279 LOG_E("CapnpRRClient::finish called");
c@142 280
cannam@170 281 checkServerOK();
cannam@170 282
c@97 283 FinishRequest request;
c@96 284 request.plugin = plugin;
c@96 285
c@96 286 capnp::MallocMessageBuilder message;
c@97 287 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 288
c@96 289 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@96 290 ReqId id = getId();
c@96 291 builder.getId().setNumber(id);
c@96 292
c@134 293 auto karr = call(message, "finish", true);
c@96 294
c@96 295 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 296 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 297
c@97 298 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
c@96 299
c@97 300 FinishResponse pr;
c@96 301 VampnProto::readFinishResponse(pr,
c@96 302 reader.getResponse().getFinish(),
c@96 303 m_mapper);
c@96 304
c@96 305 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@96 306
c@118 307 // Don't delete the plugin. It's the plugin that is supposed
c@118 308 // to be calling us here
c@96 309
c@142 310 LOG_E("CapnpRRClient::finish returning");
c@142 311
c@96 312 return pr.features;
c@96 313 }
c@96 314
c@96 315 virtual void
c@96 316 reset(PluginStub *plugin,
c@97 317 PluginConfiguration config) override {
c@96 318
c@96 319 // Reload the plugin on the server side, and configure it as requested
c@134 320
c@134 321 log("CapnpRRClient: reset() called, plugin will be closed and reloaded");
c@96 322
cannam@170 323 checkServerOK();
cannam@170 324
c@96 325 if (m_mapper.havePlugin(plugin)) {
c@96 326 (void)finish(plugin); // server-side unload
c@96 327 }
c@96 328
c@97 329 PluginStaticData psd;
c@97 330 PluginConfiguration defaultConfig;
c@96 331 PluginHandleMapper::Handle handle =
c@96 332 serverLoad(plugin->getPluginKey(),
c@96 333 plugin->getInputSampleRate(),
c@96 334 plugin->getAdapterFlags(),
c@96 335 psd, defaultConfig);
c@96 336
c@96 337 m_mapper.addPlugin(handle, plugin);
c@96 338
c@96 339 (void)configure(plugin, config);
c@96 340 }
c@96 341
c@96 342 private:
c@96 343 AssignedPluginHandleMapper m_mapper;
c@96 344 ReqId getId() {
c@96 345 //!!! todo: mutex
c@96 346 static ReqId m_nextId = 0;
c@96 347 return m_nextId++;
c@96 348 }
c@96 349
c@96 350 static
c@96 351 kj::Array<capnp::word>
c@96 352 toKJArray(const std::vector<char> &buffer) {
c@118 353 // We could do this whole thing with fewer copies, but let's
c@118 354 // see whether it matters first
c@96 355 size_t wordSize = sizeof(capnp::word);
c@118 356 size_t words = buffer.size() / wordSize;
c@118 357 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@118 358 memcpy(karr.begin(), buffer.data(), words * wordSize);
c@118 359 return karr;
c@96 360 }
c@96 361
c@96 362 void
cannam@170 363 checkServerOK() {
cannam@170 364 if (!m_transport->isOK()) {
cannam@170 365 log("Piper server crashed or failed to start (caller should have checked this)");
cannam@170 366 throw ServerCrashed();
cannam@170 367 }
cannam@170 368 }
cannam@170 369
cannam@170 370 /**
cannam@170 371 * Check (i) that the response has the same id as supplied (which
cannam@170 372 * presumably is the corresponding request id) and (ii) that the
cannam@170 373 * response has the expected type.
cannam@170 374 *
cannam@170 375 * Return only if both of these things are the case.
cannam@170 376 *
cannam@170 377 * If the response has the right id but is an error response,
cannam@170 378 * throw a ServiceError exception with the error response's
cannam@170 379 * message in it.
cannam@170 380 *
cannam@170 381 * If the response has the wrong id, or if it has the wrong type
cannam@170 382 * and is not an error response, throw ProtocolError. (i.e. for
cannam@170 383 * cases having errors that are not conveyed through our official
cannam@170 384 * error response.)
cannam@170 385 */
cannam@170 386 void
c@97 387 checkResponseType(const piper::RpcResponse::Reader &r,
c@97 388 piper::RpcResponse::Response::Which type,
c@96 389 ReqId id) {
c@96 390
c@96 391 if (ReqId(r.getId().getNumber()) != id) {
c@134 392 std::ostringstream s;
c@134 393 s << "checkResponseType: wrong response id (received "
c@134 394 << r.getId().getNumber() << ", expected " << id << ")";
c@134 395 log(s.str());
cannam@170 396 throw ProtocolError("Wrong response id");
cannam@170 397 }
cannam@170 398 if (r.getResponse().which() != type) {
cannam@170 399 if (r.getResponse().which() == piper::RpcResponse::Response::Which::ERROR) {
cannam@170 400 int code;
cannam@170 401 std::string message;
cannam@170 402 VampnProto::readRpcResponse_Error(code, message, r);
cannam@170 403 std::ostringstream s;
cannam@170 404 s << "checkResponseType: received an error with message: "
cannam@170 405 << message;
cannam@170 406 log(s.str());
cannam@170 407 throw ServiceError(message);
cannam@170 408 } else {
cannam@170 409 std::ostringstream s;
cannam@170 410 s << "checkResponseType: wrong response type (received "
cannam@170 411 << int(r.getResponse().which()) << ", expected " << int(type) << ")";
cannam@170 412 log(s.str());
cannam@170 413 throw ProtocolError("Wrong response type");
cannam@170 414 }
c@96 415 }
c@96 416 }
c@96 417
c@96 418 kj::Array<capnp::word>
c@134 419 call(capnp::MallocMessageBuilder &message, std::string type, bool slow) {
c@96 420 auto arr = capnp::messageToFlatArray(message);
c@96 421 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@126 422 arr.asChars().size(),
c@134 423 type,
c@126 424 slow);
c@118 425 return toKJArray(responseBuffer);
c@96 426 }
c@96 427
c@96 428 PluginHandleMapper::Handle
c@96 429 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
c@97 430 PluginStaticData &psd,
c@97 431 PluginConfiguration &defaultConfig) {
c@96 432
c@97 433 LoadRequest request;
c@96 434 request.pluginKey = key;
c@96 435 request.inputSampleRate = inputSampleRate;
c@96 436 request.adapterFlags = adapterFlags;
c@96 437
c@96 438 capnp::MallocMessageBuilder message;
c@97 439 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 440
c@96 441 VampnProto::buildRpcRequest_Load(builder, request);
c@96 442 ReqId id = getId();
c@96 443 builder.getId().setNumber(id);
c@96 444
c@134 445 auto karr = call(message, "load", false);
c@96 446
c@96 447 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 448 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 449
c@97 450 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
c@96 451
c@97 452 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@96 453 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@96 454 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@96 455 return lr.getHandle();
c@96 456 };
c@96 457
c@96 458 private:
c@134 459 LogCallback *m_logger;
c@96 460 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@96 461 CompletenessChecker *m_completenessChecker; // I own this
c@134 462
c@134 463 void log(std::string message) const {
c@134 464 if (m_logger) m_logger->log(message);
c@134 465 else std::cerr << message << std::endl;
c@134 466 }
c@96 467 };
c@96 468
c@96 469 }
c@96 470 }
c@96 471
c@96 472 #endif