annotate vamp-client/CapnpRRClient.h @ 144:654e85cc3ba1

Make this log option be off by default
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 18 Jan 2017 14:19:22 +0000
parents 9768b5ca710a
children 228a66adfb30
rev   line source
cannam@111 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
c@118 2 /*
c@118 3 Piper C++
c@118 4
c@118 5 An API for audio analysis and feature extraction plugins.
c@118 6
c@118 7 Centre for Digital Music, Queen Mary, University of London.
c@118 8 Copyright 2006-2016 Chris Cannam and QMUL.
c@118 9
c@118 10 Permission is hereby granted, free of charge, to any person
c@118 11 obtaining a copy of this software and associated documentation
c@118 12 files (the "Software"), to deal in the Software without
c@118 13 restriction, including without limitation the rights to use, copy,
c@118 14 modify, merge, publish, distribute, sublicense, and/or sell copies
c@118 15 of the Software, and to permit persons to whom the Software is
c@118 16 furnished to do so, subject to the following conditions:
c@118 17
c@118 18 The above copyright notice and this permission notice shall be
c@118 19 included in all copies or substantial portions of the Software.
c@118 20
c@118 21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
c@118 22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
c@118 23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
c@118 24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
c@118 25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
c@118 26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
c@118 27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
c@118 28
c@118 29 Except as contained in this notice, the names of the Centre for
c@118 30 Digital Music; Queen Mary, University of London; and Chris Cannam
c@118 31 shall not be used in advertising or otherwise to promote the sale,
c@118 32 use or other dealings in this Software without prior written
c@118 33 authorization.
c@118 34 */
c@96 35
c@96 36 #ifndef PIPER_CAPNP_CLIENT_H
c@96 37 #define PIPER_CAPNP_CLIENT_H
c@96 38
c@96 39 #include "Loader.h"
c@96 40 #include "PluginClient.h"
c@96 41 #include "PluginStub.h"
c@96 42 #include "SynchronousTransport.h"
c@96 43
c@96 44 #include "vamp-support/AssignedPluginHandleMapper.h"
c@96 45 #include "vamp-capnp/VampnProto.h"
c@96 46
c@134 47 #include <sstream>
c@134 48
c@96 49 #include <capnp/serialize.h>
c@96 50
c@144 51 //#define LOG_ENTRYPOINTS 1
c@142 52
c@142 53 #ifdef LOG_ENTRYPOINTS
c@142 54 #define LOG_E(x) log(x)
c@142 55 #else
c@142 56 #define LOG_E(x)
c@142 57 #endif
c@142 58
c@97 59 namespace piper_vamp {
c@97 60 namespace client {
c@96 61
c@100 62 /**
c@100 63 * Client for a request-response Piper server, i.e. using the
c@100 64 * RpcRequest/RpcResponse structures with a single process call rather
c@100 65 * than having individual RPC methods, with a synchronous transport
c@100 66 * such as a subprocess pipe arrangement. Only one request can be
c@100 67 * handled at a time. This class is thread-safe if and only if it is
c@100 68 * constructed with a thread-safe SynchronousTransport implementation.
c@100 69 */
c@96 70 class CapnpRRClient : public PluginClient,
c@118 71 public Loader
c@96 72 {
c@96 73 // unsigned to avoid undefined behaviour on possible wrap
c@96 74 typedef uint32_t ReqId;
c@96 75
c@96 76 class CompletenessChecker : public MessageCompletenessChecker {
c@96 77 public:
c@96 78 bool isComplete(const std::vector<char> &message) const override {
c@96 79 auto karr = toKJArray(message);
c@96 80 size_t words = karr.size();
c@96 81 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
c@96 82 if (words > expected) {
c@96 83 std::cerr << "WARNING: obtained more data than expected ("
c@96 84 << words << " " << sizeof(capnp::word)
c@96 85 << "-byte words, expected "
c@96 86 << expected << ")" << std::endl;
c@96 87 }
c@96 88 return words >= expected;
c@96 89 }
c@96 90 };
c@96 91
c@96 92 public:
c@134 93 CapnpRRClient(SynchronousTransport *transport, //!!! ownership? shared ptr?
c@134 94 LogCallback *logger) : // logger may be nullptr for cerr
c@134 95 m_logger(logger),
c@96 96 m_transport(transport),
c@96 97 m_completenessChecker(new CompletenessChecker) {
c@96 98 transport->setCompletenessChecker(m_completenessChecker);
c@96 99 }
c@96 100
c@96 101 ~CapnpRRClient() {
c@96 102 delete m_completenessChecker;
c@96 103 }
c@96 104
c@96 105 //!!! obviously, factor out all repetitive guff
c@96 106
c@96 107 //!!! list and load are supposed to be called by application code,
c@96 108 //!!! but the rest are only supposed to be called by the plugin --
c@96 109 //!!! sort out the api here
c@96 110
c@96 111 // Loader methods:
c@96 112
c@97 113 ListResponse
c@132 114 listPluginData(const ListRequest &req) override {
c@96 115
c@142 116 LOG_E("CapnpRRClient::listPluginData called");
c@142 117
c@96 118 if (!m_transport->isOK()) {
c@134 119 log("Piper server crashed or failed to start (caller should have checked this)");
c@126 120 throw std::runtime_error("Piper server crashed or failed to start");
c@96 121 }
c@96 122
c@96 123 capnp::MallocMessageBuilder message;
c@118 124 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@131 125 VampnProto::buildRpcRequest_List(builder, req);
c@96 126 ReqId id = getId();
c@96 127 builder.getId().setNumber(id);
c@96 128
c@134 129 auto karr = call(message, "list", true);
c@96 130
c@96 131 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 132 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 133
c@97 134 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
c@96 135
c@97 136 ListResponse lr;
c@96 137 VampnProto::readListResponse(lr, reader.getResponse().getList());
c@142 138
c@142 139 LOG_E("CapnpRRClient::listPluginData returning");
c@142 140
c@96 141 return lr;
c@96 142 }
c@96 143
c@97 144 LoadResponse
c@97 145 loadPlugin(const LoadRequest &req) override {
c@96 146
c@142 147 LOG_E("CapnpRRClient::loadPlugin called");
c@142 148
c@96 149 if (!m_transport->isOK()) {
c@134 150 log("Piper server crashed or failed to start (caller should have checked this)");
c@126 151 throw std::runtime_error("Piper server crashed or failed to start");
c@96 152 }
c@96 153
c@97 154 LoadResponse resp;
c@96 155 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
c@96 156 req.inputSampleRate,
c@96 157 req.adapterFlags,
c@96 158 resp.staticData,
c@96 159 resp.defaultConfiguration);
c@96 160
c@96 161 Vamp::Plugin *plugin = new PluginStub(this,
c@96 162 req.pluginKey,
c@96 163 req.inputSampleRate,
c@96 164 req.adapterFlags,
c@96 165 resp.staticData,
c@96 166 resp.defaultConfiguration);
c@96 167
c@96 168 m_mapper.addPlugin(handle, plugin);
c@96 169
c@96 170 resp.plugin = plugin;
c@142 171
c@142 172 LOG_E("CapnpRRClient::loadPlugin returning");
c@142 173
c@96 174 return resp;
c@96 175 }
c@96 176
c@96 177 // PluginClient methods:
c@96 178
c@96 179 virtual
c@96 180 Vamp::Plugin::OutputList
c@96 181 configure(PluginStub *plugin,
c@97 182 PluginConfiguration config) override {
c@96 183
c@142 184 LOG_E("CapnpRRClient::configure called");
c@142 185
c@96 186 if (!m_transport->isOK()) {
c@134 187 log("Piper server crashed or failed to start (caller should have checked this)");
c@126 188 throw std::runtime_error("Piper server crashed or failed to start");
c@96 189 }
c@96 190
c@97 191 ConfigurationRequest request;
c@96 192 request.plugin = plugin;
c@96 193 request.configuration = config;
c@96 194
c@96 195 capnp::MallocMessageBuilder message;
c@97 196 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 197
c@96 198 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@96 199 ReqId id = getId();
c@96 200 builder.getId().setNumber(id);
c@96 201
c@134 202 auto karr = call(message, "configure", true);
c@96 203
c@96 204 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 205 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 206
c@96 207 //!!! handle (explicit) error case
c@96 208
c@97 209 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
c@96 210
c@97 211 ConfigurationResponse cr;
c@96 212 VampnProto::readConfigurationResponse(cr,
c@96 213 reader.getResponse().getConfigure(),
c@96 214 m_mapper);
c@96 215
c@142 216 LOG_E("CapnpRRClient::configure returning");
c@142 217
c@96 218 return cr.outputs;
c@96 219 };
c@96 220
c@96 221 virtual
c@96 222 Vamp::Plugin::FeatureSet
c@96 223 process(PluginStub *plugin,
c@96 224 std::vector<std::vector<float> > inputBuffers,
c@96 225 Vamp::RealTime timestamp) override {
c@96 226
c@142 227 LOG_E("CapnpRRClient::process called");
c@142 228
c@96 229 if (!m_transport->isOK()) {
c@134 230 log("Piper server crashed or failed to start (caller should have checked this)");
c@126 231 throw std::runtime_error("Piper server crashed or failed to start");
c@96 232 }
c@96 233
c@97 234 ProcessRequest request;
c@96 235 request.plugin = plugin;
c@96 236 request.inputBuffers = inputBuffers;
c@96 237 request.timestamp = timestamp;
c@96 238
c@96 239 capnp::MallocMessageBuilder message;
c@97 240 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 241 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@118 242 ReqId id = getId();
c@96 243 builder.getId().setNumber(id);
c@96 244
c@134 245 auto karr = call(message, "process", false);
c@96 246
c@96 247 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 248 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 249
c@96 250 //!!! handle (explicit) error case
c@96 251
c@97 252 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
c@96 253
c@97 254 ProcessResponse pr;
c@96 255 VampnProto::readProcessResponse(pr,
c@96 256 reader.getResponse().getProcess(),
c@96 257 m_mapper);
c@96 258
c@142 259 LOG_E("CapnpRRClient::process returning");
c@142 260
c@96 261 return pr.features;
c@96 262 }
c@96 263
c@96 264 virtual Vamp::Plugin::FeatureSet
c@96 265 finish(PluginStub *plugin) override {
c@96 266
c@142 267 LOG_E("CapnpRRClient::finish called");
c@142 268
c@96 269 if (!m_transport->isOK()) {
c@134 270 log("Piper server crashed or failed to start (caller should have checked this)");
c@126 271 throw std::runtime_error("Piper server crashed or failed to start");
c@96 272 }
c@96 273
c@97 274 FinishRequest request;
c@96 275 request.plugin = plugin;
c@96 276
c@96 277 capnp::MallocMessageBuilder message;
c@97 278 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 279
c@96 280 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@96 281 ReqId id = getId();
c@96 282 builder.getId().setNumber(id);
c@96 283
c@134 284 auto karr = call(message, "finish", true);
c@96 285
c@96 286 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 287 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 288
c@96 289 //!!! handle (explicit) error case
c@96 290
c@97 291 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
c@96 292
c@97 293 FinishResponse pr;
c@96 294 VampnProto::readFinishResponse(pr,
c@96 295 reader.getResponse().getFinish(),
c@96 296 m_mapper);
c@96 297
c@96 298 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@96 299
c@118 300 // Don't delete the plugin. It's the plugin that is supposed
c@118 301 // to be calling us here
c@96 302
c@142 303 LOG_E("CapnpRRClient::finish returning");
c@142 304
c@96 305 return pr.features;
c@96 306 }
c@96 307
c@96 308 virtual void
c@96 309 reset(PluginStub *plugin,
c@97 310 PluginConfiguration config) override {
c@96 311
c@96 312 // Reload the plugin on the server side, and configure it as requested
c@134 313
c@134 314 log("CapnpRRClient: reset() called, plugin will be closed and reloaded");
c@96 315
c@96 316 if (!m_transport->isOK()) {
c@134 317 log("Piper server crashed or failed to start (caller should have checked this)");
c@126 318 throw std::runtime_error("Piper server crashed or failed to start");
c@96 319 }
c@96 320
c@96 321 if (m_mapper.havePlugin(plugin)) {
c@96 322 (void)finish(plugin); // server-side unload
c@96 323 }
c@96 324
c@97 325 PluginStaticData psd;
c@97 326 PluginConfiguration defaultConfig;
c@96 327 PluginHandleMapper::Handle handle =
c@96 328 serverLoad(plugin->getPluginKey(),
c@96 329 plugin->getInputSampleRate(),
c@96 330 plugin->getAdapterFlags(),
c@96 331 psd, defaultConfig);
c@96 332
c@96 333 m_mapper.addPlugin(handle, plugin);
c@96 334
c@96 335 (void)configure(plugin, config);
c@96 336 }
c@96 337
c@96 338 private:
c@96 339 AssignedPluginHandleMapper m_mapper;
c@96 340 ReqId getId() {
c@96 341 //!!! todo: mutex
c@96 342 static ReqId m_nextId = 0;
c@96 343 return m_nextId++;
c@96 344 }
c@96 345
c@96 346 static
c@96 347 kj::Array<capnp::word>
c@96 348 toKJArray(const std::vector<char> &buffer) {
c@118 349 // We could do this whole thing with fewer copies, but let's
c@118 350 // see whether it matters first
c@96 351 size_t wordSize = sizeof(capnp::word);
c@118 352 size_t words = buffer.size() / wordSize;
c@118 353 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@118 354 memcpy(karr.begin(), buffer.data(), words * wordSize);
c@118 355 return karr;
c@96 356 }
c@96 357
c@96 358 void
c@97 359 checkResponseType(const piper::RpcResponse::Reader &r,
c@97 360 piper::RpcResponse::Response::Which type,
c@96 361 ReqId id) {
c@96 362
c@96 363 if (r.getResponse().which() != type) {
c@134 364 std::ostringstream s;
c@134 365 s << "checkResponseType: wrong response type (received "
c@134 366 << int(r.getResponse().which()) << ", expected " << int(type) << ")";
c@134 367 log(s.str());
c@96 368 throw std::runtime_error("Wrong response type");
c@96 369 }
c@96 370 if (ReqId(r.getId().getNumber()) != id) {
c@134 371 std::ostringstream s;
c@134 372 s << "checkResponseType: wrong response id (received "
c@134 373 << r.getId().getNumber() << ", expected " << id << ")";
c@134 374 log(s.str());
c@96 375 throw std::runtime_error("Wrong response id");
c@96 376 }
c@96 377 }
c@96 378
c@96 379 kj::Array<capnp::word>
c@134 380 call(capnp::MallocMessageBuilder &message, std::string type, bool slow) {
c@96 381 auto arr = capnp::messageToFlatArray(message);
c@96 382 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@126 383 arr.asChars().size(),
c@134 384 type,
c@126 385 slow);
c@118 386 return toKJArray(responseBuffer);
c@96 387 }
c@96 388
c@96 389 PluginHandleMapper::Handle
c@96 390 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
c@97 391 PluginStaticData &psd,
c@97 392 PluginConfiguration &defaultConfig) {
c@96 393
c@97 394 LoadRequest request;
c@96 395 request.pluginKey = key;
c@96 396 request.inputSampleRate = inputSampleRate;
c@96 397 request.adapterFlags = adapterFlags;
c@96 398
c@96 399 capnp::MallocMessageBuilder message;
c@97 400 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 401
c@96 402 VampnProto::buildRpcRequest_Load(builder, request);
c@96 403 ReqId id = getId();
c@96 404 builder.getId().setNumber(id);
c@96 405
c@134 406 auto karr = call(message, "load", false);
c@96 407
c@96 408 //!!! ... --> will also need some way to kill this process
c@96 409 //!!! (from another thread)
c@96 410
c@96 411 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 412 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 413
c@96 414 //!!! handle (explicit) error case
c@96 415
c@97 416 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
c@96 417
c@97 418 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@96 419 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@96 420 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@96 421 return lr.getHandle();
c@96 422 };
c@96 423
c@96 424 private:
c@134 425 LogCallback *m_logger;
c@96 426 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@96 427 CompletenessChecker *m_completenessChecker; // I own this
c@134 428
c@134 429 void log(std::string message) const {
c@134 430 if (m_logger) m_logger->log(message);
c@134 431 else std::cerr << message << std::endl;
c@134 432 }
c@96 433 };
c@96 434
c@96 435 }
c@96 436 }
c@96 437
c@96 438 #endif