annotate vamp-client/client.cpp @ 88:bf2e6f939f9f

Rename, + adjust finish
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 12 Oct 2016 19:02:31 +0100
parents 7a77a374b6b2
children 03ed2e0a6c8f
rev   line source
c@78 1
c@78 2 #include "stub.h"
c@78 3
c@80 4 #include "vamp-capnp/VampnProto.h"
c@80 5
c@80 6 #include "vamp-support/AssignedPluginHandleMapper.h"
c@80 7
c@83 8 #include <QProcess>
c@83 9
c@83 10 #include <stdexcept>
c@83 11
c@83 12 using std::cerr;
c@83 13 using std::endl;
c@83 14
c@82 15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so
c@82 16 // we're using pipes and the server is completely synchronous,
c@82 17 // handling only one call at once. Our PiperClient will fire off a
c@82 18 // QProcess and refer to its io device. Each request message is
c@82 19 // serialised into memory using capnp::MallocMessageBuilder and
c@82 20 // shunted into the process pipe; we then wait for some bytes to come
c@82 21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when
c@82 22 // a whole message is available, reading only that amount from the
c@82 23 // device and using FlatArrayMessageReader to convert to a response
c@82 24 // message. If the response message's id does not match the request
c@82 25 // message's, then the server has gone wrong (it should never be
c@82 26 // servicing more than one request at a time).
c@82 27
c@82 28 // Next level: Capnp RPC, but I want to get the first level to work
c@83 29 // first, not least because the server already exists.
c@82 30
c@81 31 namespace piper { //!!! probably something different
c@80 32
c@88 33 class PiperClient : public PiperClientStubRequirements
c@80 34 {
c@81 35 // unsigned to avoid undefined behaviour on possible wrap
c@81 36 typedef uint32_t ReqId;
c@81 37
c@80 38 public:
c@83 39 PiperClient() {
c@83 40 m_process = new QProcess();
c@83 41 m_process->setReadChannel(QProcess::StandardOutput);
c@83 42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
c@83 43 m_process->start("../bin/piper-vamp-server"); //!!!
c@83 44 if (!m_process->waitForStarted()) {
c@83 45 cerr << "server failed to start" << endl;
c@83 46 delete m_process;
c@83 47 m_process = 0;
c@83 48 }
c@83 49 }
c@81 50
c@83 51 ~PiperClient() {
c@83 52 if (m_process) {
c@83 53 if (m_process->state() != QProcess::NotRunning) {
c@83 54 m_process->close();
c@83 55 m_process->waitForFinished();
c@83 56 }
c@83 57 delete m_process;
c@83 58 }
c@83 59 }
c@81 60
c@83 61 //!!! obviously, factor out all repetitive guff
c@84 62
c@88 63 //!!! list and load are supposed to be called by application code,
c@88 64 //!!! but the rest are only supposed to be called by the plugin --
c@88 65 //!!! sort out the api here
c@88 66
c@81 67 Vamp::Plugin *
c@81 68 load(std::string key, float inputSampleRate, int adapterFlags) {
c@81 69
c@83 70 if (!m_process) {
c@83 71 throw std::runtime_error("Piper server failed to start");
c@83 72 }
c@83 73
c@81 74 Vamp::HostExt::LoadRequest request;
c@81 75 request.pluginKey = key;
c@81 76 request.inputSampleRate = inputSampleRate;
c@81 77 request.adapterFlags = adapterFlags;
c@81 78
c@84 79 capnp::MallocMessageBuilder message;
c@81 80 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@81 81
c@81 82 VampnProto::buildRpcRequest_Load(builder, request);
c@81 83 ReqId id = getId();
c@81 84 builder.getId().setNumber(id);
c@83 85
c@83 86 auto arr = messageToFlatArray(message);
c@83 87 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@83 88
c@84 89 //!!! ... --> will also need some way to kill this process
c@84 90 //!!! (from another thread)
c@84 91
c@84 92 QByteArray buffer = readResponseBuffer();
c@87 93 auto karr = toKJArray(buffer);
c@87 94 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 95 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 96
c@84 97 //!!! handle (explicit) error case
c@84 98
c@84 99 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
c@84 100
c@84 101 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@84 102
c@84 103 Vamp::HostExt::PluginStaticData psd;
c@84 104 Vamp::HostExt::PluginConfiguration defaultConfig;
c@84 105 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@84 106 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@84 107
c@84 108 Vamp::Plugin *plugin = new PiperStubPlugin(this,
c@84 109 inputSampleRate,
c@84 110 psd,
c@84 111 defaultConfig);
c@84 112
c@84 113 m_mapper.addPlugin(lr.getHandle(), plugin);
c@84 114
c@84 115 return plugin;
c@81 116 };
c@80 117
c@80 118 virtual
c@80 119 Vamp::Plugin::OutputList
c@80 120 configure(PiperStubPlugin *plugin,
c@80 121 Vamp::HostExt::PluginConfiguration config) {
c@80 122
c@83 123 if (!m_process) {
c@83 124 throw std::runtime_error("Piper server failed to start");
c@83 125 }
c@83 126
c@80 127 Vamp::HostExt::ConfigurationRequest request;
c@80 128 request.plugin = plugin;
c@80 129 request.configuration = config;
c@80 130
c@84 131 capnp::MallocMessageBuilder message;
c@80 132 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@80 133
c@80 134 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@81 135 ReqId id = getId();
c@81 136 builder.getId().setNumber(id);
c@84 137
c@84 138 auto arr = messageToFlatArray(message);
c@84 139 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 140
c@84 141 QByteArray buffer = readResponseBuffer();
c@87 142 auto karr = toKJArray(buffer);
c@87 143 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 144 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@80 145
c@84 146 //!!! handle (explicit) error case
c@84 147
c@84 148 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
c@84 149
c@84 150 Vamp::HostExt::ConfigurationResponse cr;
c@84 151 VampnProto::readConfigurationResponse(cr,
c@84 152 reader.getResponse().getConfigure(),
c@84 153 m_mapper);
c@84 154
c@84 155 return cr.outputs;
c@81 156 };
c@80 157
c@80 158 virtual
c@80 159 Vamp::Plugin::FeatureSet
c@80 160 process(PiperStubPlugin *plugin,
c@84 161 std::vector<std::vector<float> > inputBuffers,
c@84 162 Vamp::RealTime timestamp) {
c@84 163
c@84 164 if (!m_process) {
c@84 165 throw std::runtime_error("Piper server failed to start");
c@84 166 }
c@84 167
c@84 168 Vamp::HostExt::ProcessRequest request;
c@84 169 request.plugin = plugin;
c@84 170 request.inputBuffers = inputBuffers;
c@84 171 request.timestamp = timestamp;
c@84 172
c@84 173 capnp::MallocMessageBuilder message;
c@84 174 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 175
c@84 176 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@84 177 ReqId id = getId();
c@84 178 builder.getId().setNumber(id);
c@84 179
c@84 180 auto arr = messageToFlatArray(message);
c@84 181 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 182
c@84 183 QByteArray buffer = readResponseBuffer();
c@87 184 auto karr = toKJArray(buffer);
c@87 185 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 186 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 187
c@84 188 //!!! handle (explicit) error case
c@84 189
c@84 190 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
c@84 191
c@84 192 Vamp::HostExt::ProcessResponse pr;
c@84 193 VampnProto::readProcessResponse(pr,
c@84 194 reader.getResponse().getProcess(),
c@84 195 m_mapper);
c@84 196
c@84 197 return pr.features;
c@84 198 }
c@80 199
c@80 200 virtual Vamp::Plugin::FeatureSet
c@84 201 finish(PiperStubPlugin *plugin) {
c@84 202
c@84 203 if (!m_process) {
c@84 204 throw std::runtime_error("Piper server failed to start");
c@84 205 }
c@84 206
c@84 207 Vamp::HostExt::FinishRequest request;
c@84 208 request.plugin = plugin;
c@84 209
c@84 210 capnp::MallocMessageBuilder message;
c@84 211 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 212
c@84 213 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@84 214 ReqId id = getId();
c@84 215 builder.getId().setNumber(id);
c@84 216
c@84 217 auto arr = messageToFlatArray(message);
c@84 218 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 219
c@84 220 QByteArray buffer = readResponseBuffer();
c@87 221 auto karr = toKJArray(buffer);
c@87 222 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 223 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 224
c@84 225 //!!! handle (explicit) error case
c@84 226
c@84 227 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
c@84 228
c@84 229 Vamp::HostExt::ProcessResponse pr;
c@84 230 VampnProto::readFinishResponse(pr,
c@84 231 reader.getResponse().getFinish(),
c@84 232 m_mapper);
c@84 233
c@84 234 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@88 235
c@88 236 // Don't delete the plugin. It's the plugin that is supposed
c@88 237 // to be calling us here
c@84 238
c@84 239 return pr.features;
c@84 240 }
c@80 241
c@80 242 private:
c@83 243 QProcess *m_process;
c@80 244 AssignedPluginHandleMapper m_mapper;
c@84 245 ReqId getId() {
c@81 246 //!!! todo: mutex
c@81 247 static ReqId m_nextId = 0;
c@81 248 return m_nextId++;
c@81 249 }
c@84 250
c@87 251 kj::Array<capnp::word>
c@87 252 toKJArray(QByteArray qarr) {
c@87 253 // We could do this whole thing with fewer copies, but let's
c@87 254 // see whether it matters first
c@84 255 size_t wordSize = sizeof(capnp::word);
c@87 256 size_t words = qarr.size() / wordSize;
c@87 257 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@87 258 memcpy(karr.begin(), qarr.data(), words * wordSize);
c@87 259 return karr;
c@84 260 }
c@84 261
c@84 262 QByteArray
c@84 263 readResponseBuffer() {
c@84 264
c@84 265 QByteArray buffer;
c@84 266 size_t wordSize = sizeof(capnp::word);
c@84 267 bool complete = false;
c@84 268
c@84 269 while (!complete) {
c@84 270
c@84 271 m_process->waitForReadyRead(1000);
c@84 272 qint64 byteCount = m_process->bytesAvailable();
c@84 273 qint64 wordCount = byteCount / wordSize;
c@84 274
c@84 275 if (!wordCount) {
c@84 276 if (m_process->state() == QProcess::NotRunning) {
c@84 277 cerr << "ERROR: Subprocess exited: Load failed" << endl;
c@84 278 throw std::runtime_error("Piper server exited unexpectedly");
c@84 279 }
c@84 280 } else {
c@84 281 buffer.append(m_process->read(wordCount * wordSize));
c@84 282 size_t haveWords = buffer.size() / wordSize;
c@84 283 size_t expectedWords =
c@87 284 capnp::expectedSizeInWordsFromPrefix(toKJArray(buffer));
c@84 285 if (haveWords >= expectedWords) {
c@84 286 if (haveWords > expectedWords) {
c@84 287 cerr << "WARNING: obtained more data than expected ("
c@84 288 << haveWords << " words, expected " << expectedWords
c@84 289 << ")" << endl;
c@84 290 }
c@84 291 complete = true;
c@84 292 }
c@84 293 }
c@84 294 }
c@87 295 /*
c@85 296 cerr << "buffer = ";
c@85 297 for (int i = 0; i < buffer.size(); ++i) {
c@85 298 if (i % 16 == 0) cerr << "\n";
c@85 299 cerr << int(buffer[i]) << " ";
c@85 300 }
c@85 301 cerr << "\n";
c@87 302 */
c@84 303 return buffer;
c@84 304 }
c@84 305
c@84 306 void
c@84 307 checkResponseType(const RpcResponse::Reader &r,
c@84 308 RpcResponse::Response::Which type,
c@84 309 ReqId id) {
c@84 310
c@84 311 if (r.getResponse().which() != type) {
c@84 312 throw std::runtime_error("Wrong response type");
c@84 313 }
c@84 314 if (ReqId(r.getId().getNumber()) != id) {
c@84 315 throw std::runtime_error("Wrong response id");
c@84 316 }
c@84 317 }
c@80 318 };
c@80 319
c@80 320 }
c@80 321
c@84 322 int main(int, char **)
c@84 323 {
c@84 324 piper::PiperClient client;
c@84 325 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
c@84 326 if (!plugin->initialise(1, 4, 4)) {
c@84 327 cerr << "initialisation failed" << endl;
c@84 328 } else {
c@84 329 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
c@84 330 float *bd = buf.data();
c@84 331 Vamp::Plugin::FeatureSet features = plugin->process
c@84 332 (&bd, Vamp::RealTime::zeroTime);
c@84 333 cerr << "results for output 0:" << endl;
c@84 334 auto fl(features[0]);
c@84 335 for (const auto &f: fl) {
c@84 336 cerr << f.values[0] << endl;
c@84 337 }
c@84 338 }
c@88 339
c@87 340 (void)plugin->getRemainingFeatures();
c@88 341 delete plugin;
c@87 342 //!!! -- and also implement reset(), which will need to reconstruct internally
c@84 343 }
c@84 344