annotate vamp-client/CapnpRRClient.h @ 98:f55631599988

Plugin that creates its own server
author Chris Cannam <c.cannam@qmul.ac.uk>
date Thu, 13 Oct 2016 19:11:24 +0100
parents 427c4c725085
children bbb99f94e225
rev   line source
c@96 1
c@96 2 #ifndef PIPER_CAPNP_CLIENT_H
c@96 3 #define PIPER_CAPNP_CLIENT_H
c@96 4
c@96 5 #include "Loader.h"
c@96 6 #include "PluginClient.h"
c@96 7 #include "PluginStub.h"
c@96 8 #include "SynchronousTransport.h"
c@96 9
c@96 10 #include "vamp-support/AssignedPluginHandleMapper.h"
c@96 11 #include "vamp-capnp/VampnProto.h"
c@96 12
c@96 13 #include <capnp/serialize.h>
c@96 14
c@97 15 namespace piper_vamp {
c@97 16 namespace client {
c@96 17
c@96 18 class CapnpRRClient : public PluginClient,
c@97 19 public Loader
c@96 20 {
c@96 21 // unsigned to avoid undefined behaviour on possible wrap
c@96 22 typedef uint32_t ReqId;
c@96 23
c@96 24 class CompletenessChecker : public MessageCompletenessChecker {
c@96 25 public:
c@96 26 bool isComplete(const std::vector<char> &message) const override {
c@96 27 auto karr = toKJArray(message);
c@96 28 size_t words = karr.size();
c@96 29 size_t expected = capnp::expectedSizeInWordsFromPrefix(karr);
c@96 30 if (words > expected) {
c@96 31 std::cerr << "WARNING: obtained more data than expected ("
c@96 32 << words << " " << sizeof(capnp::word)
c@96 33 << "-byte words, expected "
c@96 34 << expected << ")" << std::endl;
c@96 35 }
c@96 36 return words >= expected;
c@96 37 }
c@96 38 };
c@96 39
c@96 40 public:
c@96 41 CapnpRRClient(SynchronousTransport *transport) : //!!! ownership? shared ptr?
c@96 42 m_transport(transport),
c@96 43 m_completenessChecker(new CompletenessChecker) {
c@96 44 transport->setCompletenessChecker(m_completenessChecker);
c@96 45 }
c@96 46
c@96 47 ~CapnpRRClient() {
c@96 48 delete m_completenessChecker;
c@96 49 }
c@96 50
c@96 51 //!!! obviously, factor out all repetitive guff
c@96 52
c@96 53 //!!! list and load are supposed to be called by application code,
c@96 54 //!!! but the rest are only supposed to be called by the plugin --
c@96 55 //!!! sort out the api here
c@96 56
c@96 57 // Loader methods:
c@96 58
c@97 59 ListResponse
c@96 60 listPluginData() override {
c@96 61
c@96 62 if (!m_transport->isOK()) {
c@96 63 throw std::runtime_error("Piper server failed to start");
c@96 64 }
c@96 65
c@96 66 capnp::MallocMessageBuilder message;
c@97 67 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 68 VampnProto::buildRpcRequest_List(builder);
c@96 69 ReqId id = getId();
c@96 70 builder.getId().setNumber(id);
c@96 71
c@96 72 auto karr = call(message);
c@96 73
c@96 74 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 75 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 76
c@97 77 checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id);
c@96 78
c@97 79 ListResponse lr;
c@96 80 VampnProto::readListResponse(lr, reader.getResponse().getList());
c@96 81 return lr;
c@96 82 }
c@96 83
c@97 84 LoadResponse
c@97 85 loadPlugin(const LoadRequest &req) override {
c@96 86
c@96 87 if (!m_transport->isOK()) {
c@96 88 throw std::runtime_error("Piper server failed to start");
c@96 89 }
c@96 90
c@97 91 LoadResponse resp;
c@96 92 PluginHandleMapper::Handle handle = serverLoad(req.pluginKey,
c@96 93 req.inputSampleRate,
c@96 94 req.adapterFlags,
c@96 95 resp.staticData,
c@96 96 resp.defaultConfiguration);
c@96 97
c@96 98 Vamp::Plugin *plugin = new PluginStub(this,
c@96 99 req.pluginKey,
c@96 100 req.inputSampleRate,
c@96 101 req.adapterFlags,
c@96 102 resp.staticData,
c@96 103 resp.defaultConfiguration);
c@96 104
c@96 105 m_mapper.addPlugin(handle, plugin);
c@96 106
c@96 107 resp.plugin = plugin;
c@96 108 return resp;
c@96 109 }
c@96 110
c@96 111 // PluginClient methods:
c@96 112
c@96 113 virtual
c@96 114 Vamp::Plugin::OutputList
c@96 115 configure(PluginStub *plugin,
c@97 116 PluginConfiguration config) override {
c@96 117
c@96 118 if (!m_transport->isOK()) {
c@96 119 throw std::runtime_error("Piper server failed to start");
c@96 120 }
c@96 121
c@97 122 ConfigurationRequest request;
c@96 123 request.plugin = plugin;
c@96 124 request.configuration = config;
c@96 125
c@96 126 capnp::MallocMessageBuilder message;
c@97 127 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 128
c@96 129 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@96 130 ReqId id = getId();
c@96 131 builder.getId().setNumber(id);
c@96 132
c@96 133 auto karr = call(message);
c@96 134
c@96 135 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 136 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 137
c@96 138 //!!! handle (explicit) error case
c@96 139
c@97 140 checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id);
c@96 141
c@97 142 ConfigurationResponse cr;
c@96 143 VampnProto::readConfigurationResponse(cr,
c@96 144 reader.getResponse().getConfigure(),
c@96 145 m_mapper);
c@96 146
c@96 147 return cr.outputs;
c@96 148 };
c@96 149
c@96 150 virtual
c@96 151 Vamp::Plugin::FeatureSet
c@96 152 process(PluginStub *plugin,
c@96 153 std::vector<std::vector<float> > inputBuffers,
c@96 154 Vamp::RealTime timestamp) override {
c@96 155
c@96 156 if (!m_transport->isOK()) {
c@96 157 throw std::runtime_error("Piper server failed to start");
c@96 158 }
c@96 159
c@97 160 ProcessRequest request;
c@96 161 request.plugin = plugin;
c@96 162 request.inputBuffers = inputBuffers;
c@96 163 request.timestamp = timestamp;
c@96 164
c@96 165 capnp::MallocMessageBuilder message;
c@97 166 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 167 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@96 168 ReqId id = getId();
c@96 169 builder.getId().setNumber(id);
c@96 170
c@96 171 auto karr = call(message);
c@96 172
c@96 173 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 174 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 175
c@96 176 //!!! handle (explicit) error case
c@96 177
c@97 178 checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id);
c@96 179
c@97 180 ProcessResponse pr;
c@96 181 VampnProto::readProcessResponse(pr,
c@96 182 reader.getResponse().getProcess(),
c@96 183 m_mapper);
c@96 184
c@96 185 return pr.features;
c@96 186 }
c@96 187
c@96 188 virtual Vamp::Plugin::FeatureSet
c@96 189 finish(PluginStub *plugin) override {
c@96 190
c@96 191 if (!m_transport->isOK()) {
c@96 192 throw std::runtime_error("Piper server failed to start");
c@96 193 }
c@96 194
c@97 195 FinishRequest request;
c@96 196 request.plugin = plugin;
c@96 197
c@96 198 capnp::MallocMessageBuilder message;
c@97 199 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 200
c@96 201 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@96 202 ReqId id = getId();
c@96 203 builder.getId().setNumber(id);
c@96 204
c@96 205 auto karr = call(message);
c@96 206
c@96 207 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 208 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 209
c@96 210 //!!! handle (explicit) error case
c@96 211
c@97 212 checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id);
c@96 213
c@97 214 FinishResponse pr;
c@96 215 VampnProto::readFinishResponse(pr,
c@96 216 reader.getResponse().getFinish(),
c@96 217 m_mapper);
c@96 218
c@96 219 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@96 220
c@96 221 // Don't delete the plugin. It's the plugin that is supposed
c@96 222 // to be calling us here
c@96 223
c@96 224 return pr.features;
c@96 225 }
c@96 226
c@96 227 virtual void
c@96 228 reset(PluginStub *plugin,
c@97 229 PluginConfiguration config) override {
c@96 230
c@96 231 // Reload the plugin on the server side, and configure it as requested
c@96 232
c@96 233 if (!m_transport->isOK()) {
c@96 234 throw std::runtime_error("Piper server failed to start");
c@96 235 }
c@96 236
c@96 237 if (m_mapper.havePlugin(plugin)) {
c@96 238 (void)finish(plugin); // server-side unload
c@96 239 }
c@96 240
c@97 241 PluginStaticData psd;
c@97 242 PluginConfiguration defaultConfig;
c@96 243 PluginHandleMapper::Handle handle =
c@96 244 serverLoad(plugin->getPluginKey(),
c@96 245 plugin->getInputSampleRate(),
c@96 246 plugin->getAdapterFlags(),
c@96 247 psd, defaultConfig);
c@96 248
c@96 249 m_mapper.addPlugin(handle, plugin);
c@96 250
c@96 251 (void)configure(plugin, config);
c@96 252 }
c@96 253
c@96 254 private:
c@96 255 AssignedPluginHandleMapper m_mapper;
c@96 256 ReqId getId() {
c@96 257 //!!! todo: mutex
c@96 258 static ReqId m_nextId = 0;
c@96 259 return m_nextId++;
c@96 260 }
c@96 261
c@96 262 static
c@96 263 kj::Array<capnp::word>
c@96 264 toKJArray(const std::vector<char> &buffer) {
c@96 265 // We could do this whole thing with fewer copies, but let's
c@96 266 // see whether it matters first
c@96 267 size_t wordSize = sizeof(capnp::word);
c@96 268 size_t words = buffer.size() / wordSize;
c@96 269 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@96 270 memcpy(karr.begin(), buffer.data(), words * wordSize);
c@96 271 return karr;
c@96 272 }
c@96 273
c@96 274 void
c@97 275 checkResponseType(const piper::RpcResponse::Reader &r,
c@97 276 piper::RpcResponse::Response::Which type,
c@96 277 ReqId id) {
c@96 278
c@96 279 if (r.getResponse().which() != type) {
c@96 280 throw std::runtime_error("Wrong response type");
c@96 281 }
c@96 282 if (ReqId(r.getId().getNumber()) != id) {
c@96 283 throw std::runtime_error("Wrong response id");
c@96 284 }
c@96 285 }
c@96 286
c@96 287 kj::Array<capnp::word>
c@96 288 call(capnp::MallocMessageBuilder &message) {
c@96 289 auto arr = capnp::messageToFlatArray(message);
c@96 290 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@96 291 arr.asChars().size());
c@96 292 return toKJArray(responseBuffer);
c@96 293 }
c@96 294
c@96 295 PluginHandleMapper::Handle
c@96 296 serverLoad(std::string key, float inputSampleRate, int adapterFlags,
c@97 297 PluginStaticData &psd,
c@97 298 PluginConfiguration &defaultConfig) {
c@96 299
c@97 300 LoadRequest request;
c@96 301 request.pluginKey = key;
c@96 302 request.inputSampleRate = inputSampleRate;
c@96 303 request.adapterFlags = adapterFlags;
c@96 304
c@96 305 capnp::MallocMessageBuilder message;
c@97 306 piper::RpcRequest::Builder builder = message.initRoot<piper::RpcRequest>();
c@96 307
c@96 308 VampnProto::buildRpcRequest_Load(builder, request);
c@96 309 ReqId id = getId();
c@96 310 builder.getId().setNumber(id);
c@96 311
c@96 312 auto karr = call(message);
c@96 313
c@96 314 //!!! ... --> will also need some way to kill this process
c@96 315 //!!! (from another thread)
c@96 316
c@96 317 capnp::FlatArrayMessageReader responseMessage(karr);
c@97 318 piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
c@96 319
c@96 320 //!!! handle (explicit) error case
c@96 321
c@97 322 checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id);
c@96 323
c@97 324 const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@96 325 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@96 326 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@96 327 return lr.getHandle();
c@96 328 };
c@96 329
c@96 330 private:
c@96 331 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@96 332 CompletenessChecker *m_completenessChecker; // I own this
c@96 333 };
c@96 334
c@96 335 }
c@96 336 }
c@96 337
c@96 338 #endif