annotate vamp-client/PiperCapnpClient.h @ 90:6429a99abcad

Split out classes
author Chris Cannam <c.cannam@qmul.ac.uk>
date Thu, 13 Oct 2016 10:17:59 +0100
parents
children c897c9a8daf1
rev   line source
c@90 1
c@90 2 #ifndef PIPER_CAPNP_CLIENT_H
c@90 3 #define PIPER_CAPNP_CLIENT_H
c@90 4
c@90 5 #include "PiperClient.h"
c@90 6 #include "SynchronousTransport.h"
c@90 7
c@90 8 #include "vamp-support/AssignedPluginHandleMapper.h"
c@90 9 #include "vamp-capnp/VampnProto.h"
c@90 10
c@90 11 namespace piper { //!!! change
c@90 12
c@90 13 class PiperCapnpClient : public PiperStubPluginClientInterface
c@90 14 {
c@90 15 // unsigned to avoid undefined behaviour on possible wrap
c@90 16 typedef uint32_t ReqId;
c@90 17
c@90 18 public:
c@90 19 PiperCapnpClient(SynchronousTransport *transport) : //!!! ownership? shared ptr?
c@90 20 m_transport(transport) {
c@90 21 }
c@90 22
c@90 23 ~PiperCapnpClient() {
c@90 24 }
c@90 25
c@90 26 //!!! obviously, factor out all repetitive guff
c@90 27
c@90 28 //!!! list and load are supposed to be called by application code,
c@90 29 //!!! but the rest are only supposed to be called by the plugin --
c@90 30 //!!! sort out the api here
c@90 31
c@90 32 Vamp::Plugin *
c@90 33 load(std::string key, float inputSampleRate, int adapterFlags) {
c@90 34
c@90 35 if (!m_transport->isOK()) {
c@90 36 throw std::runtime_error("Piper server failed to start");
c@90 37 }
c@90 38
c@90 39 Vamp::HostExt::LoadRequest request;
c@90 40 request.pluginKey = key;
c@90 41 request.inputSampleRate = inputSampleRate;
c@90 42 request.adapterFlags = adapterFlags;
c@90 43
c@90 44 capnp::MallocMessageBuilder message;
c@90 45 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@90 46
c@90 47 VampnProto::buildRpcRequest_Load(builder, request);
c@90 48 ReqId id = getId();
c@90 49 builder.getId().setNumber(id);
c@90 50
c@90 51 auto arr = messageToFlatArray(message);
c@90 52
c@90 53 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@90 54 arr.asChars().size());
c@90 55
c@90 56 //!!! ... --> will also need some way to kill this process
c@90 57 //!!! (from another thread)
c@90 58
c@90 59 auto karr = toKJArray(responseBuffer);
c@90 60 capnp::FlatArrayMessageReader responseMessage(karr);
c@90 61 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@90 62
c@90 63 //!!! handle (explicit) error case
c@90 64
c@90 65 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
c@90 66
c@90 67 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@90 68
c@90 69 Vamp::HostExt::PluginStaticData psd;
c@90 70 Vamp::HostExt::PluginConfiguration defaultConfig;
c@90 71 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@90 72 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@90 73
c@90 74 Vamp::Plugin *plugin = new PiperStubPlugin(this,
c@90 75 inputSampleRate,
c@90 76 psd,
c@90 77 defaultConfig);
c@90 78
c@90 79 m_mapper.addPlugin(lr.getHandle(), plugin);
c@90 80
c@90 81 return plugin;
c@90 82 };
c@90 83
c@90 84 protected:
c@90 85 virtual
c@90 86 Vamp::Plugin::OutputList
c@90 87 configure(PiperStubPlugin *plugin,
c@90 88 Vamp::HostExt::PluginConfiguration config) override {
c@90 89
c@90 90 if (!m_transport->isOK()) {
c@90 91 throw std::runtime_error("Piper server failed to start");
c@90 92 }
c@90 93
c@90 94 Vamp::HostExt::ConfigurationRequest request;
c@90 95 request.plugin = plugin;
c@90 96 request.configuration = config;
c@90 97
c@90 98 capnp::MallocMessageBuilder message;
c@90 99 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@90 100
c@90 101 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@90 102 ReqId id = getId();
c@90 103 builder.getId().setNumber(id);
c@90 104
c@90 105 auto arr = messageToFlatArray(message);
c@90 106 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@90 107 arr.asChars().size());
c@90 108 auto karr = toKJArray(responseBuffer);
c@90 109 capnp::FlatArrayMessageReader responseMessage(karr);
c@90 110 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@90 111
c@90 112 //!!! handle (explicit) error case
c@90 113
c@90 114 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
c@90 115
c@90 116 Vamp::HostExt::ConfigurationResponse cr;
c@90 117 VampnProto::readConfigurationResponse(cr,
c@90 118 reader.getResponse().getConfigure(),
c@90 119 m_mapper);
c@90 120
c@90 121 return cr.outputs;
c@90 122 };
c@90 123
c@90 124 virtual
c@90 125 Vamp::Plugin::FeatureSet
c@90 126 process(PiperStubPlugin *plugin,
c@90 127 std::vector<std::vector<float> > inputBuffers,
c@90 128 Vamp::RealTime timestamp) override {
c@90 129
c@90 130 if (!m_transport->isOK()) {
c@90 131 throw std::runtime_error("Piper server failed to start");
c@90 132 }
c@90 133
c@90 134 Vamp::HostExt::ProcessRequest request;
c@90 135 request.plugin = plugin;
c@90 136 request.inputBuffers = inputBuffers;
c@90 137 request.timestamp = timestamp;
c@90 138
c@90 139 capnp::MallocMessageBuilder message;
c@90 140 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@90 141
c@90 142 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@90 143 ReqId id = getId();
c@90 144 builder.getId().setNumber(id);
c@90 145
c@90 146 auto arr = messageToFlatArray(message);
c@90 147 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@90 148 arr.asChars().size());
c@90 149 auto karr = toKJArray(responseBuffer);
c@90 150 capnp::FlatArrayMessageReader responseMessage(karr);
c@90 151 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@90 152
c@90 153 //!!! handle (explicit) error case
c@90 154
c@90 155 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
c@90 156
c@90 157 Vamp::HostExt::ProcessResponse pr;
c@90 158 VampnProto::readProcessResponse(pr,
c@90 159 reader.getResponse().getProcess(),
c@90 160 m_mapper);
c@90 161
c@90 162 return pr.features;
c@90 163 }
c@90 164
c@90 165 virtual Vamp::Plugin::FeatureSet
c@90 166 finish(PiperStubPlugin *plugin) override {
c@90 167
c@90 168 if (!m_transport->isOK()) {
c@90 169 throw std::runtime_error("Piper server failed to start");
c@90 170 }
c@90 171
c@90 172 Vamp::HostExt::FinishRequest request;
c@90 173 request.plugin = plugin;
c@90 174
c@90 175 capnp::MallocMessageBuilder message;
c@90 176 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@90 177
c@90 178 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@90 179 ReqId id = getId();
c@90 180 builder.getId().setNumber(id);
c@90 181
c@90 182 auto arr = messageToFlatArray(message);
c@90 183 auto responseBuffer = m_transport->call(arr.asChars().begin(),
c@90 184 arr.asChars().size());
c@90 185 auto karr = toKJArray(responseBuffer);
c@90 186 capnp::FlatArrayMessageReader responseMessage(karr);
c@90 187 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@90 188
c@90 189 //!!! handle (explicit) error case
c@90 190
c@90 191 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
c@90 192
c@90 193 Vamp::HostExt::ProcessResponse pr;
c@90 194 VampnProto::readFinishResponse(pr,
c@90 195 reader.getResponse().getFinish(),
c@90 196 m_mapper);
c@90 197
c@90 198 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@90 199
c@90 200 // Don't delete the plugin. It's the plugin that is supposed
c@90 201 // to be calling us here
c@90 202
c@90 203 return pr.features;
c@90 204 }
c@90 205
c@90 206 private:
c@90 207 AssignedPluginHandleMapper m_mapper;
c@90 208 ReqId getId() {
c@90 209 //!!! todo: mutex
c@90 210 static ReqId m_nextId = 0;
c@90 211 return m_nextId++;
c@90 212 }
c@90 213
c@90 214 kj::Array<capnp::word>
c@90 215 toKJArray(const std::vector<char> &buffer) {
c@90 216 // We could do this whole thing with fewer copies, but let's
c@90 217 // see whether it matters first
c@90 218 size_t wordSize = sizeof(capnp::word);
c@90 219 size_t words = buffer.size() / wordSize;
c@90 220 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@90 221 memcpy(karr.begin(), buffer.data(), words * wordSize);
c@90 222 return karr;
c@90 223 }
c@90 224
c@90 225 void
c@90 226 checkResponseType(const RpcResponse::Reader &r,
c@90 227 RpcResponse::Response::Which type,
c@90 228 ReqId id) {
c@90 229
c@90 230 if (r.getResponse().which() != type) {
c@90 231 throw std::runtime_error("Wrong response type");
c@90 232 }
c@90 233 if (ReqId(r.getId().getNumber()) != id) {
c@90 234 throw std::runtime_error("Wrong response id");
c@90 235 }
c@90 236 }
c@90 237
c@90 238 private:
c@90 239 SynchronousTransport *m_transport; //!!! I don't own this, but should I?
c@90 240 };
c@90 241
c@90 242 }
c@90 243
c@90 244 #endif