annotate vamp-client/CapnpRRClient.h @ 296:50a0b4fea7f1 tip master

Merge pull request #8 from michel-slm/gcc15 Include headers needed to compile with GCC 15's -std=gnu23 default
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 27 Jan 2025 08:53:58 +0000
parents 26027c3a99a0
children
rev   line source
cannam@111 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
c@118 2 /*
c@118 3 Piper C++
c@118 4
c@118 5 An API for audio analysis and feature extraction plugins.
c@118 6
c@118 7 Centre for Digital Music, Queen Mary, University of London.
c@118 8 Copyright 2006-2016 Chris Cannam and QMUL.
c@118 9
c@118 10 Permission is hereby granted, free of charge, to any person
c@118 11 obtaining a copy of this software and associated documentation
c@118 12 files (the "Software"), to deal in the Software without
c@118 13 restriction, including without limitation the rights to use, copy,
c@118 14 modify, merge, publish, distribute, sublicense, and/or sell copies
c@118 15 of the Software, and to permit persons to whom the Software is
c@118 16 furnished to do so, subject to the following conditions:
c@118 17
c@118 18 The above copyright notice and this permission notice shall be
c@118 19 included in all copies or substantial portions of the Software.
c@118 20
c@118 21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
c@118 22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
c@118 23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
c@118 24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
c@118 25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
c@118 26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
c@118 27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
c@118 28
c@118 29 Except as contained in this notice, the names of the Centre for
c@118 30 Digital Music; Queen Mary, University of London; and Chris Cannam
c@118 31 shall not be used in advertising or otherwise to promote the sale,
c@118 32 use or other dealings in this Software without prior written
c@118 33 authorization.
c@118 34 */
c@96 35
c@96 36 #ifndef PIPER_CAPNP_CLIENT_H
c@96 37 #define PIPER_CAPNP_CLIENT_H
c@96 38
c@96 39 #include "Loader.h"
c@96 40 #include "PluginClient.h"
cannam@208 41 #include "PiperVampPlugin.h"
c@96 42 #include "SynchronousTransport.h"
c@96 43
c@96 44 #include "vamp-support/AssignedPluginHandleMapper.h"
c@96 45 #include "vamp-capnp/VampnProto.h"
c@96 46
c@134 47 #include <sstream>
c@134 48
c@96 49 #include <capnp/serialize.h>
c@96 50
c@144 51 //#define LOG_ENTRYPOINTS 1
c@142 52
c@142 53 #ifdef LOG_ENTRYPOINTS
c@142 54 #define LOG_E(x) log(x)
c@142 55 #else
c@142 56 #define LOG_E(x)
c@142 57 #endif
c@142 58
c@97 59 namespace piper_vamp {
c@97 60 namespace client {
c@96 61
c@100 62 /**
c@100 63 * Client for a request-response Piper server, i.e. using the
c@100 64 * RpcRequest/RpcResponse structures with a single process call rather
c@100 65 * than having individual RPC methods, with a synchronous transport
c@100 66 * such as a subprocess pipe arrangement. Only one request can be
c@100 67 * handled at a time. This class is thread-safe if and only if it is
c@100 68 * constructed with a thread-safe SynchronousTransport implementation.
cannam@187 69 *
cannam@187 70 * This class takes Vamp-like structures (Plugin and the classes in
cannam@187 71 * vamp-support) and uses them to communicate with a Piper server
cannam@187 72 * using the Cap'n Proto serialisation of the Piper API. The transport
cannam@187 73 * layer (and thus the nature of the server) is defined by the
cannam@187 74 * SynchronousTransport passed to the constructor.
cannam@187 75 *
cannam@187 76 * This class implements both the Loader interface (which constructs
cannam@187 77 * PluginStub objects) and the PluginClient (which accepts PluginStubs
cannam@187 78 * and maps them into Piper handles).
c@100 79 */
c@96 80 class CapnpRRClient : public PluginClient,
c@118 81 public Loader
c@96 82 {
c@96 83 // unsigned to avoid undefined behaviour on possible wrap
c@96 84 typedef uint32_t ReqId;
c@96 85
c@96 86 class CompletenessChecker : public MessageCompletenessChecker {
c@96 87 public:
c@146 88 State check(const std::vector<char> &message) const override {
c@146 89
c@147 90 if (message.size() < sizeof(capnp::word)) {
c@147 91 return Incomplete;
c@147 92 }
c@147 93
c@96 94 auto karr = toKJArray(message);
c@96 95 size_t words = karr.size();
c@96 96 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
c@146 97
c@146 98 // Lacking a way to definitively check whether a message
c@146 99 // is valid or not, we would still like to trap obvious
c@146 100 // cases where a programming mistake results in garbage
c@146 101 // being returned from the server. We impose a limit on
c@146 102 // message size and, if a prefix is projected to exceed
c@146 103 // that limit, call it invalid. If an extractor wants to
c@146 104 // return a feature set greater than a gigaword in size,
c@146 105 // it'll just have to do it across multiple process calls.
c@146 106 size_t limit = size_t(1) << 30;
c@146 107
c@145 108 // cerr << "CompletenessChecker: message.size() = " << message.size()
c@146 109 // << ", words = " << words << ", limit = " << limit << ", expected = " << expected << endl;
c@146 110
c@96 111 if (words > expected) {
c@96 112 std::cerr << "WARNING: obtained more data than expected ("
c@96 113 << words << " " << sizeof(capnp::word)
c@96 114 << "-byte words, expected "
c@96 115 << expected << ")" << std::endl;
c@146 116 return Complete;
c@146 117 } else if (words == expected) {
c@146 118 return Complete;
c@146 119 } else if (expected > limit) {
c@147 120 std::cerr << "WARNING: apparently invalid message prefix: have "
c@147 121 << words << " words in prefix, projected message size is "
c@147 122 << expected << " against limit of " << limit << std::endl;
c@146 123 return Invalid;
c@146 124 } else {
c@146 125 return Incomplete;
c@96 126 }
c@96 127 }
c@96 128 };
c@96 129
c@96 130 public:
c@134 131 CapnpRRClient(SynchronousTransport *transport, //!!! ownership? shared ptr?
c@134 132 LogCallback *logger) : // logger may be nullptr for cerr
c@134 133 m_logger(logger),
c@96 134 m_transport(transport),
c@96 135 m_completenessChecker(new CompletenessChecker) {
c@96 136 transport->setCompletenessChecker(m_completenessChecker);
c@96 137 }
c@96 138
c@96 139 ~CapnpRRClient() {
c@96 140 delete m_completenessChecker;
c@96 141 }
c@96 142
c@96 143 //!!! obviously, factor out all repetitive guff
c@96 144
c@96 145 //!!! list and load are supposed to be called by application code,
c@96 146 //!!! but the rest are only supposed to be called by the plugin --
c@96 147 //!!! sort out the api here
c@96 148
c@96 149 // Loader methods:
c@96 150
c@97 151 ListResponse
cannam@207 152 list(const ListRequest &req) override {
c@96 153
c@142 154 LOG_E("CapnpRRClient::listPluginData called");
c@142 155
cannam@170 156 checkServerOK();
cannam@170 157
c@96 158 capnp::MallocMessageBuilder message;
c@118 159 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@131 160 VampnProto::buildRpcRequest_List(builder, req);
c@96 161 ReqId id = getId();
c@96 162 builder.getId().setNumber(id);
c@96 163
c@134 164 auto karr = call(message, "list", true);
c@96 165
c@96 166 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 167 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 168
c@97 169 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
c@96 170
c@97 171 ListResponse lr;
c@96 172 VampnProto::readListResponse(lr, reader.getResponse().getList());
c@142 173
c@142 174 LOG_E("CapnpRRClient::listPluginData returning");
c@142 175
c@96 176 return lr;
c@96 177 }
c@96 178
c@97 179 LoadResponse
cannam@207 180 load(const LoadRequest &req) override {
c@96 181
c@142 182 LOG_E("CapnpRRClient::loadPlugin called");
c@142 183
cannam@170 184 checkServerOK();
cannam@170 185
c@97 186 LoadResponse resp;
c@96 187 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
c@96 188 req.inputSampleRate,
c@96 189 req.adapterFlags,
c@96 190 resp.staticData,
cannam@289 191 resp.defaultConfiguration,
cannam@289 192 resp.programParameters);
c@96 193
cannam@208 194 Vamp::Plugin *plugin = new PiperVampPlugin(this,
cannam@208 195 req.pluginKey,
cannam@208 196 req.inputSampleRate,
cannam@208 197 req.adapterFlags,
cannam@208 198 resp.staticData,
cannam@288 199 resp.defaultConfiguration,
cannam@288 200 resp.programParameters);
c@96 201
c@96 202 m_mapper.addPlugin(handle, plugin);
c@96 203
c@96 204 resp.plugin = plugin;
c@142 205
c@142 206 LOG_E("CapnpRRClient::loadPlugin returning");
c@142 207
c@96 208 return resp;
c@96 209 }
c@96 210
c@96 211 // PluginClient methods:
c@96 212
c@96 213 virtual
cannam@185 214 ConfigurationResponse
cannam@208 215 configure(PiperVampPlugin *plugin,
c@97 216 PluginConfiguration config) override {
c@96 217
c@142 218 LOG_E("CapnpRRClient::configure called");
cannam@170 219
cannam@170 220 checkServerOK();
c@142 221
c@97 222 ConfigurationRequest request;
c@96 223 request.plugin = plugin;
c@96 224 request.configuration = config;
c@96 225
c@96 226 capnp::MallocMessageBuilder message;
c@97 227 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 228
c@96 229 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@96 230 ReqId id = getId();
c@96 231 builder.getId().setNumber(id);
c@96 232
c@134 233 auto karr = call(message, "configure", true);
c@96 234
c@96 235 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 236 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 237
c@97 238 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
c@96 239
c@97 240 ConfigurationResponse cr;
c@96 241 VampnProto::readConfigurationResponse(cr,
c@96 242 reader.getResponse().getConfigure(),
c@96 243 m_mapper);
c@96 244
c@142 245 LOG_E("CapnpRRClient::configure returning");
c@142 246
cannam@185 247 return cr;
c@96 248 };
c@96 249
c@96 250 virtual
c@96 251 Vamp::Plugin::FeatureSet
cannam@208 252 process(PiperVampPlugin *plugin,
c@96 253 std::vector<std::vector<float> > inputBuffers,
c@96 254 Vamp::RealTime timestamp) override {
c@96 255
c@142 256 LOG_E("CapnpRRClient::process called");
c@142 257
cannam@170 258 checkServerOK();
cannam@170 259
c@97 260 ProcessRequest request;
c@96 261 request.plugin = plugin;
c@96 262 request.inputBuffers = inputBuffers;
c@96 263 request.timestamp = timestamp;
c@96 264
c@96 265 capnp::MallocMessageBuilder message;
c@97 266 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 267 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@118 268 ReqId id = getId();
c@96 269 builder.getId().setNumber(id);
c@96 270
c@134 271 auto karr = call(message, "process", false);
c@96 272
c@96 273 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 274 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 275
c@97 276 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
c@96 277
c@97 278 ProcessResponse pr;
c@96 279 VampnProto::readProcessResponse(pr,
c@96 280 reader.getResponse().getProcess(),
c@96 281 m_mapper);
c@96 282
c@142 283 LOG_E("CapnpRRClient::process returning");
c@142 284
c@96 285 return pr.features;
c@96 286 }
c@96 287
c@96 288 virtual Vamp::Plugin::FeatureSet
cannam@208 289 finish(PiperVampPlugin *plugin) override {
c@96 290
c@142 291 LOG_E("CapnpRRClient::finish called");
c@142 292
cannam@170 293 checkServerOK();
cannam@170 294
c@97 295 FinishRequest request;
c@96 296 request.plugin = plugin;
c@96 297
c@96 298 capnp::MallocMessageBuilder message;
c@97 299 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 300
c@96 301 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@96 302 ReqId id = getId();
c@96 303 builder.getId().setNumber(id);
c@96 304
c@134 305 auto karr = call(message, "finish", true);
c@96 306
c@96 307 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 308 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 309
c@97 310 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
c@96 311
c@97 312 FinishResponse pr;
c@96 313 VampnProto::readFinishResponse(pr,
c@96 314 reader.getResponse().getFinish(),
c@96 315 m_mapper);
c@96 316
c@96 317 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@96 318
c@118 319 // Don't delete the plugin. It's the plugin that is supposed
c@118 320 // to be calling us here
c@96 321
c@142 322 LOG_E("CapnpRRClient::finish returning");
c@142 323
c@96 324 return pr.features;
c@96 325 }
c@96 326
c@96 327 virtual void
cannam@208 328 reset(PiperVampPlugin *plugin,
c@97 329 PluginConfiguration config) override {
c@96 330
c@96 331 // Reload the plugin on the server side, and configure it as requested
c@134 332
c@134 333 log("CapnpRRClient: reset() called, plugin will be closed and reloaded");
c@96 334
cannam@170 335 checkServerOK();
cannam@170 336
c@96 337 if (m_mapper.havePlugin(plugin)) {
c@96 338 (void)finish(plugin); // server-side unload
c@96 339 }
c@96 340
c@97 341 PluginStaticData psd;
c@97 342 PluginConfiguration defaultConfig;
cannam@289 343 PluginProgramParameters programParameters;
c@96 344 PluginHandleMapper::Handle handle =
c@96 345 serverLoad(plugin->getPluginKey(),
c@96 346 plugin->getInputSampleRate(),
c@96 347 plugin->getAdapterFlags(),
cannam@289 348 psd,
cannam@289 349 defaultConfig,
cannam@289 350 programParameters);
c@96 351
c@96 352 m_mapper.addPlugin(handle, plugin);
c@96 353
c@96 354 (void)configure(plugin, config);
c@96 355 }
c@96 356
c@96 357 private:
c@96 358 AssignedPluginHandleMapper m_mapper;
c@96 359 ReqId getId() {
c@96 360 //!!! todo: mutex
c@96 361 static ReqId m_nextId = 0;
c@96 362 return m_nextId++;
c@96 363 }
c@96 364
c@96 365 static
c@96 366 kj::Array<capnp::word>
c@96 367 toKJArray(const std::vector<char> &buffer) {
c@118 368 // We could do this whole thing with fewer copies, but let's
c@118 369 // see whether it matters first
c@96 370 size_t wordSize = sizeof(capnp::word);
c@118 371 size_t words = buffer.size() / wordSize;
c@118 372 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
cannam@260 373 memcpy(reinterpret_cast<char *>(karr.begin()),
cannam@260 374 buffer.data(),
cannam@260 375 words * wordSize);
c@118 376 return karr;
c@96 377 }
c@96 378
c@96 379 void
cannam@170 380 checkServerOK() {
cannam@170 381 if (!m_transport->isOK()) {
cannam@170 382 log("Piper server crashed or failed to start (caller should have checked this)");
cannam@170 383 throw ServerCrashed();
cannam@170 384 }
cannam@170 385 }
cannam@170 386
cannam@170 387 /**
cannam@170 388 * Check (i) that the response has the same id as supplied (which
cannam@170 389 * presumably is the corresponding request id) and (ii) that the
cannam@170 390 * response has the expected type.
cannam@170 391 *
cannam@170 392 * Return only if both of these things are the case.
cannam@170 393 *
cannam@170 394 * If the response has the right id but is an error response,
cannam@170 395 * throw a ServiceError exception with the error response's
cannam@170 396 * message in it.
cannam@170 397 *
cannam@170 398 * If the response has the wrong id, or if it has the wrong type
cannam@170 399 * and is not an error response, throw ProtocolError. (i.e. for
cannam@170 400 * cases having errors that are not conveyed through our official
cannam@170 401 * error response.)
cannam@170 402 */
cannam@170 403 void
c@97 404 checkResponseType(const piper::RpcResponse::Reader &r,
c@97 405 piper::RpcResponse::Response::Which type,
c@96 406 ReqId id) {
c@96 407
c@96 408 if (ReqId(r.getId().getNumber()) != id) {
c@134 409 std::ostringstream s;
c@134 410 s << "checkResponseType: wrong response id (received "
c@134 411 << r.getId().getNumber() << ", expected " << id << ")";
c@134 412 log(s.str());
cannam@170 413 throw ProtocolError("Wrong response id");
cannam@170 414 }
cannam@170 415 if (r.getResponse().which() != type) {
cannam@170 416 if (r.getResponse().which() == piper::RpcResponse::Response::Which::ERROR) {
cannam@170 417 int code;
cannam@170 418 std::string message;
cannam@170 419 VampnProto::readRpcResponse_Error(code, message, r);
cannam@170 420 std::ostringstream s;
cannam@170 421 s << "checkResponseType: received an error with message: "
cannam@170 422 << message;
cannam@170 423 log(s.str());
cannam@170 424 throw ServiceError(message);
cannam@170 425 } else {
cannam@170 426 std::ostringstream s;
cannam@170 427 s << "checkResponseType: wrong response type (received "
cannam@170 428 << int(r.getResponse().which()) << ", expected " << int(type) << ")";
cannam@170 429 log(s.str());
cannam@170 430 throw ProtocolError("Wrong response type");
cannam@170 431 }
c@96 432 }
c@96 433 }
c@96 434
c@96 435 kj::Array<capnp::word>
c@134 436 call(capnp::MallocMessageBuilder &message, std::string type, bool slow) {
c@96 437 auto arr = capnp::messageToFlatArray(message);
c@96 438 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@126 439 arr.asChars().size(),
c@134 440 type,
c@126 441 slow);
c@118 442 return toKJArray(responseBuffer);
c@96 443 }
c@96 444
c@96 445 PluginHandleMapper::Handle
c@96 446 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
c@97 447 PluginStaticData &psd,
cannam@289 448 PluginConfiguration &defaultConfig,
cannam@289 449 PluginProgramParameters &programParameters) {
c@96 450
c@97 451 LoadRequest request;
c@96 452 request.pluginKey = key;
c@96 453 request.inputSampleRate = inputSampleRate;
c@96 454 request.adapterFlags = adapterFlags;
c@96 455
c@96 456 capnp::MallocMessageBuilder message;
c@97 457 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 458
c@96 459 VampnProto::buildRpcRequest_Load(builder, request);
c@96 460 ReqId id = getId();
c@96 461 builder.getId().setNumber(id);
c@96 462
c@134 463 auto karr = call(message, "load", false);
c@96 464
c@96 465 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 466 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 467
c@97 468 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
c@96 469
c@97 470 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@96 471 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@96 472 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
cannam@289 473 for (auto pp: lr.getProgramParameters()) {
cannam@289 474 VampnProto::readProgramParameterMap(programParameters, pp);
cannam@289 475 }
c@96 476 return lr.getHandle();
c@96 477 };
c@96 478
c@96 479 private:
c@134 480 LogCallback *m_logger;
c@96 481 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@96 482 CompletenessChecker *m_completenessChecker; // I own this
c@134 483
c@134 484 void log(std::string message) const {
c@134 485 if (m_logger) m_logger->log(message);
c@134 486 else std::cerr << message << std::endl;
c@134 487 }
c@96 488 };
c@96 489
c@96 490 }
c@96 491 }
c@96 492
c@96 493 #endif