annotate vamp-client/CapnpRRClient.h @ 106:a0edd7c97d2d

Add generated files -- they aren't supposed to change (much?) I think and it's better than expecting the compiler to be present on every platform
author Chris Cannam <c.cannam@qmul.ac.uk>
date Mon, 17 Oct 2016 18:56:18 +0100
parents 8c449824e08d
children d74dfc11927c
rev   line source
c@96 1
c@96 2 #ifndef PIPER_CAPNP_CLIENT_H
c@96 3 #define PIPER_CAPNP_CLIENT_H
c@96 4
c@96 5 #include "Loader.h"
c@96 6 #include "PluginClient.h"
c@96 7 #include "PluginStub.h"
c@96 8 #include "SynchronousTransport.h"
c@96 9
c@96 10 #include "vamp-support/AssignedPluginHandleMapper.h"
c@96 11 #include "vamp-capnp/VampnProto.h"
c@96 12
c@96 13 #include <capnp/serialize.h>
c@96 14
c@97 15 namespace piper_vamp {
c@97 16 namespace client {
c@96 17
c@100 18 /**
c@100 19 * Client for a request-response Piper server, i.e. using the
c@100 20 * RpcRequest/RpcResponse structures with a single process call rather
c@100 21 * than having individual RPC methods, with a synchronous transport
c@100 22 * such as a subprocess pipe arrangement. Only one request can be
c@100 23 * handled at a time. This class is thread-safe if and only if it is
c@100 24 * constructed with a thread-safe SynchronousTransport implementation.
c@100 25 */
c@96 26 class CapnpRRClient : public PluginClient,
c@97 27 public Loader
c@96 28 {
c@96 29 // unsigned to avoid undefined behaviour on possible wrap
c@96 30 typedef uint32_t ReqId;
c@96 31
c@96 32 class CompletenessChecker : public MessageCompletenessChecker {
c@96 33 public:
c@96 34 bool isComplete(const std::vector<char> &message) const override {
c@96 35 auto karr = toKJArray(message);
c@96 36 size_t words = karr.size();
c@96 37 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
c@96 38 if (words > expected) {
c@96 39 std::cerr << "WARNING: obtained more data than expected ("
c@96 40 << words << " " << sizeof(capnp::word)
c@96 41 << "-byte words, expected "
c@96 42 << expected << ")" << std::endl;
c@96 43 }
c@96 44 return words >= expected;
c@96 45 }
c@96 46 };
c@96 47
c@96 48 public:
c@96 49 CapnpRRClient(SynchronousTransport *transport) : //!!! ownership? shared ptr?
c@96 50 m_transport(transport),
c@96 51 m_completenessChecker(new CompletenessChecker) {
c@96 52 transport->setCompletenessChecker(m_completenessChecker);
c@96 53 }
c@96 54
c@96 55 ~CapnpRRClient() {
c@96 56 delete m_completenessChecker;
c@96 57 }
c@96 58
c@96 59 //!!! obviously, factor out all repetitive guff
c@96 60
c@96 61 //!!! list and load are supposed to be called by application code,
c@96 62 //!!! but the rest are only supposed to be called by the plugin --
c@96 63 //!!! sort out the api here
c@96 64
c@96 65 // Loader methods:
c@96 66
c@97 67 ListResponse
c@96 68 listPluginData() override {
c@96 69
c@96 70 if (!m_transport->isOK()) {
c@96 71 throw std::runtime_error("Piper server failed to start");
c@96 72 }
c@96 73
c@96 74 capnp::MallocMessageBuilder message;
c@97 75 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 76 VampnProto::buildRpcRequest_List(builder);
c@96 77 ReqId id = getId();
c@96 78 builder.getId().setNumber(id);
c@96 79
c@96 80 auto karr = call(message);
c@96 81
c@96 82 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 83 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 84
c@97 85 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
c@96 86
c@97 87 ListResponse lr;
c@96 88 VampnProto::readListResponse(lr, reader.getResponse().getList());
c@96 89 return lr;
c@96 90 }
c@96 91
c@97 92 LoadResponse
c@97 93 loadPlugin(const LoadRequest &req) override {
c@96 94
c@96 95 if (!m_transport->isOK()) {
c@96 96 throw std::runtime_error("Piper server failed to start");
c@96 97 }
c@96 98
c@97 99 LoadResponse resp;
c@96 100 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
c@96 101 req.inputSampleRate,
c@96 102 req.adapterFlags,
c@96 103 resp.staticData,
c@96 104 resp.defaultConfiguration);
c@96 105
c@96 106 Vamp::Plugin *plugin = new PluginStub(this,
c@96 107 req.pluginKey,
c@96 108 req.inputSampleRate,
c@96 109 req.adapterFlags,
c@96 110 resp.staticData,
c@96 111 resp.defaultConfiguration);
c@96 112
c@96 113 m_mapper.addPlugin(handle, plugin);
c@96 114
c@96 115 resp.plugin = plugin;
c@96 116 return resp;
c@96 117 }
c@96 118
c@96 119 // PluginClient methods:
c@96 120
c@96 121 virtual
c@96 122 Vamp::Plugin::OutputList
c@96 123 configure(PluginStub *plugin,
c@97 124 PluginConfiguration config) override {
c@96 125
c@96 126 if (!m_transport->isOK()) {
c@96 127 throw std::runtime_error("Piper server failed to start");
c@96 128 }
c@96 129
c@97 130 ConfigurationRequest request;
c@96 131 request.plugin = plugin;
c@96 132 request.configuration = config;
c@96 133
c@96 134 capnp::MallocMessageBuilder message;
c@97 135 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 136
c@96 137 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@96 138 ReqId id = getId();
c@96 139 builder.getId().setNumber(id);
c@96 140
c@96 141 auto karr = call(message);
c@96 142
c@96 143 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 144 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 145
c@96 146 //!!! handle (explicit) error case
c@96 147
c@97 148 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
c@96 149
c@97 150 ConfigurationResponse cr;
c@96 151 VampnProto::readConfigurationResponse(cr,
c@96 152 reader.getResponse().getConfigure(),
c@96 153 m_mapper);
c@96 154
c@96 155 return cr.outputs;
c@96 156 };
c@96 157
c@96 158 virtual
c@96 159 Vamp::Plugin::FeatureSet
c@96 160 process(PluginStub *plugin,
c@96 161 std::vector<std::vector<float> > inputBuffers,
c@96 162 Vamp::RealTime timestamp) override {
c@96 163
c@96 164 if (!m_transport->isOK()) {
c@96 165 throw std::runtime_error("Piper server failed to start");
c@96 166 }
c@96 167
c@97 168 ProcessRequest request;
c@96 169 request.plugin = plugin;
c@96 170 request.inputBuffers = inputBuffers;
c@96 171 request.timestamp = timestamp;
c@96 172
c@96 173 capnp::MallocMessageBuilder message;
c@97 174 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 175 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@96 176 ReqId id = getId();
c@96 177 builder.getId().setNumber(id);
c@96 178
c@96 179 auto karr = call(message);
c@96 180
c@96 181 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 182 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 183
c@96 184 //!!! handle (explicit) error case
c@96 185
c@97 186 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
c@96 187
c@97 188 ProcessResponse pr;
c@96 189 VampnProto::readProcessResponse(pr,
c@96 190 reader.getResponse().getProcess(),
c@96 191 m_mapper);
c@96 192
c@96 193 return pr.features;
c@96 194 }
c@96 195
c@96 196 virtual Vamp::Plugin::FeatureSet
c@96 197 finish(PluginStub *plugin) override {
c@96 198
c@96 199 if (!m_transport->isOK()) {
c@96 200 throw std::runtime_error("Piper server failed to start");
c@96 201 }
c@96 202
c@97 203 FinishRequest request;
c@96 204 request.plugin = plugin;
c@96 205
c@96 206 capnp::MallocMessageBuilder message;
c@97 207 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 208
c@96 209 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@96 210 ReqId id = getId();
c@96 211 builder.getId().setNumber(id);
c@96 212
c@96 213 auto karr = call(message);
c@96 214
c@96 215 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 216 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 217
c@96 218 //!!! handle (explicit) error case
c@96 219
c@97 220 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
c@96 221
c@97 222 FinishResponse pr;
c@96 223 VampnProto::readFinishResponse(pr,
c@96 224 reader.getResponse().getFinish(),
c@96 225 m_mapper);
c@96 226
c@96 227 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@96 228
c@96 229 // Don't delete the plugin. It's the plugin that is supposed
c@96 230 // to be calling us here
c@96 231
c@96 232 return pr.features;
c@96 233 }
c@96 234
c@96 235 virtual void
c@96 236 reset(PluginStub *plugin,
c@97 237 PluginConfiguration config) override {
c@96 238
c@96 239 // Reload the plugin on the server side, and configure it as requested
c@96 240
c@96 241 if (!m_transport->isOK()) {
c@96 242 throw std::runtime_error("Piper server failed to start");
c@96 243 }
c@96 244
c@96 245 if (m_mapper.havePlugin(plugin)) {
c@96 246 (void)finish(plugin); // server-side unload
c@96 247 }
c@96 248
c@97 249 PluginStaticData psd;
c@97 250 PluginConfiguration defaultConfig;
c@96 251 PluginHandleMapper::Handle handle =
c@96 252 serverLoad(plugin->getPluginKey(),
c@96 253 plugin->getInputSampleRate(),
c@96 254 plugin->getAdapterFlags(),
c@96 255 psd, defaultConfig);
c@96 256
c@96 257 m_mapper.addPlugin(handle, plugin);
c@96 258
c@96 259 (void)configure(plugin, config);
c@96 260 }
c@96 261
c@96 262 private:
c@96 263 AssignedPluginHandleMapper m_mapper;
c@96 264 ReqId getId() {
c@96 265 //!!! todo: mutex
c@96 266 static ReqId m_nextId = 0;
c@96 267 return m_nextId++;
c@96 268 }
c@96 269
c@96 270 static
c@96 271 kj::Array<capnp::word>
c@96 272 toKJArray(const std::vector<char> &buffer) {
c@96 273 // We could do this whole thing with fewer copies, but let's
c@96 274 // see whether it matters first
c@96 275 size_t wordSize = sizeof(capnp::word);
c@96 276 size_t words = buffer.size() / wordSize;
c@96 277 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@96 278 memcpy(karr.begin(), buffer.data(), words * wordSize);
c@96 279 return karr;
c@96 280 }
c@96 281
c@96 282 void
c@97 283 checkResponseType(const piper::RpcResponse::Reader &r,
c@97 284 piper::RpcResponse::Response::Which type,
c@96 285 ReqId id) {
c@96 286
c@96 287 if (r.getResponse().which() != type) {
c@101 288 std::cerr << "checkResponseType: wrong response type (received "
c@101 289 << r.getResponse().which() << ", expected " << type << ")"
c@101 290 << std::endl;
c@96 291 throw std::runtime_error("Wrong response type");
c@96 292 }
c@96 293 if (ReqId(r.getId().getNumber()) != id) {
c@101 294 std::cerr << "checkResponseType: wrong response id (received "
c@101 295 << r.getId().getNumber() << ", expected " << id << ")"
c@101 296 << std::endl;
c@96 297 throw std::runtime_error("Wrong response id");
c@96 298 }
c@96 299 }
c@96 300
c@96 301 kj::Array<capnp::word>
c@96 302 call(capnp::MallocMessageBuilder &message) {
c@96 303 auto arr = capnp::messageToFlatArray(message);
c@96 304 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@96 305 arr.asChars().size());
c@96 306 return toKJArray(responseBuffer);
c@96 307 }
c@96 308
c@96 309 PluginHandleMapper::Handle
c@96 310 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
c@97 311 PluginStaticData &psd,
c@97 312 PluginConfiguration &defaultConfig) {
c@96 313
c@97 314 LoadRequest request;
c@96 315 request.pluginKey = key;
c@96 316 request.inputSampleRate = inputSampleRate;
c@96 317 request.adapterFlags = adapterFlags;
c@96 318
c@96 319 capnp::MallocMessageBuilder message;
c@97 320 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 321
c@96 322 VampnProto::buildRpcRequest_Load(builder, request);
c@96 323 ReqId id = getId();
c@96 324 builder.getId().setNumber(id);
c@96 325
c@96 326 auto karr = call(message);
c@96 327
c@96 328 //!!! ... --> will also need some way to kill this process
c@96 329 //!!! (from another thread)
c@96 330
c@96 331 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 332 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 333
c@96 334 //!!! handle (explicit) error case
c@96 335
c@97 336 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
c@96 337
c@97 338 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@96 339 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@96 340 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@96 341 return lr.getHandle();
c@96 342 };
c@96 343
c@96 344 private:
c@96 345 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@96 346 CompletenessChecker *m_completenessChecker; // I own this
c@96 347 };
c@96 348
c@96 349 }
c@96 350 }
c@96 351
c@96 352 #endif