annotate vamp-client/client.cpp @ 89:03ed2e0a6c8f

More testing
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 12 Oct 2016 21:34:21 +0100
parents bf2e6f939f9f
children 6429a99abcad
rev   line source
c@78 1
c@78 2 #include "stub.h"
c@78 3
c@80 4 #include "vamp-capnp/VampnProto.h"
c@80 5
c@80 6 #include "vamp-support/AssignedPluginHandleMapper.h"
c@80 7
c@83 8 #include <QProcess>
c@83 9
c@83 10 #include <stdexcept>
c@83 11
c@83 12 using std::cerr;
c@83 13 using std::endl;
c@83 14
c@82 15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so
c@82 16 // we're using pipes and the server is completely synchronous,
c@82 17 // handling only one call at once. Our PiperClient will fire off a
c@82 18 // QProcess and refer to its io device. Each request message is
c@82 19 // serialised into memory using capnp::MallocMessageBuilder and
c@82 20 // shunted into the process pipe; we then wait for some bytes to come
c@82 21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when
c@82 22 // a whole message is available, reading only that amount from the
c@82 23 // device and using FlatArrayMessageReader to convert to a response
c@82 24 // message. If the response message's id does not match the request
c@82 25 // message's, then the server has gone wrong (it should never be
c@82 26 // servicing more than one request at a time).
c@82 27
c@82 28 // Next level: Capnp RPC, but I want to get the first level to work
c@83 29 // first, not least because the server already exists.
c@82 30
c@81 31 namespace piper { //!!! probably something different
c@80 32
c@88 33 class PiperClient : public PiperClientStubRequirements
c@80 34 {
c@81 35 // unsigned to avoid undefined behaviour on possible wrap
c@81 36 typedef uint32_t ReqId;
c@81 37
c@80 38 public:
c@83 39 PiperClient() {
c@83 40 m_process = new QProcess();
c@83 41 m_process->setReadChannel(QProcess::StandardOutput);
c@83 42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
c@83 43 m_process->start("../bin/piper-vamp-server"); //!!!
c@83 44 if (!m_process->waitForStarted()) {
c@83 45 cerr << "server failed to start" << endl;
c@83 46 delete m_process;
c@83 47 m_process = 0;
c@83 48 }
c@83 49 }
c@81 50
c@83 51 ~PiperClient() {
c@83 52 if (m_process) {
c@83 53 if (m_process->state() != QProcess::NotRunning) {
c@89 54 m_process->closeWriteChannel();
c@89 55 m_process->waitForFinished(200);
c@83 56 m_process->close();
c@83 57 m_process->waitForFinished();
c@89 58 cerr << "server exited" << endl;
c@83 59 }
c@83 60 delete m_process;
c@83 61 }
c@83 62 }
c@81 63
c@83 64 //!!! obviously, factor out all repetitive guff
c@84 65
c@88 66 //!!! list and load are supposed to be called by application code,
c@88 67 //!!! but the rest are only supposed to be called by the plugin --
c@88 68 //!!! sort out the api here
c@88 69
c@81 70 Vamp::Plugin *
c@81 71 load(std::string key, float inputSampleRate, int adapterFlags) {
c@81 72
c@83 73 if (!m_process) {
c@83 74 throw std::runtime_error("Piper server failed to start");
c@83 75 }
c@83 76
c@81 77 Vamp::HostExt::LoadRequest request;
c@81 78 request.pluginKey = key;
c@81 79 request.inputSampleRate = inputSampleRate;
c@81 80 request.adapterFlags = adapterFlags;
c@81 81
c@84 82 capnp::MallocMessageBuilder message;
c@81 83 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@81 84
c@81 85 VampnProto::buildRpcRequest_Load(builder, request);
c@81 86 ReqId id = getId();
c@81 87 builder.getId().setNumber(id);
c@83 88
c@83 89 auto arr = messageToFlatArray(message);
c@83 90 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@83 91
c@84 92 //!!! ... --> will also need some way to kill this process
c@84 93 //!!! (from another thread)
c@84 94
c@84 95 QByteArray buffer = readResponseBuffer();
c@87 96 auto karr = toKJArray(buffer);
c@87 97 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 98 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 99
c@84 100 //!!! handle (explicit) error case
c@84 101
c@84 102 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
c@84 103
c@84 104 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@84 105
c@84 106 Vamp::HostExt::PluginStaticData psd;
c@84 107 Vamp::HostExt::PluginConfiguration defaultConfig;
c@84 108 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@84 109 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@84 110
c@84 111 Vamp::Plugin *plugin = new PiperStubPlugin(this,
c@84 112 inputSampleRate,
c@84 113 psd,
c@84 114 defaultConfig);
c@84 115
c@84 116 m_mapper.addPlugin(lr.getHandle(), plugin);
c@84 117
c@84 118 return plugin;
c@81 119 };
c@80 120
c@80 121 virtual
c@80 122 Vamp::Plugin::OutputList
c@80 123 configure(PiperStubPlugin *plugin,
c@80 124 Vamp::HostExt::PluginConfiguration config) {
c@80 125
c@83 126 if (!m_process) {
c@83 127 throw std::runtime_error("Piper server failed to start");
c@83 128 }
c@83 129
c@80 130 Vamp::HostExt::ConfigurationRequest request;
c@80 131 request.plugin = plugin;
c@80 132 request.configuration = config;
c@80 133
c@84 134 capnp::MallocMessageBuilder message;
c@80 135 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@80 136
c@80 137 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@81 138 ReqId id = getId();
c@81 139 builder.getId().setNumber(id);
c@84 140
c@84 141 auto arr = messageToFlatArray(message);
c@84 142 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 143
c@84 144 QByteArray buffer = readResponseBuffer();
c@87 145 auto karr = toKJArray(buffer);
c@87 146 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 147 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@80 148
c@84 149 //!!! handle (explicit) error case
c@84 150
c@84 151 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
c@84 152
c@84 153 Vamp::HostExt::ConfigurationResponse cr;
c@84 154 VampnProto::readConfigurationResponse(cr,
c@84 155 reader.getResponse().getConfigure(),
c@84 156 m_mapper);
c@84 157
c@84 158 return cr.outputs;
c@81 159 };
c@80 160
c@80 161 virtual
c@80 162 Vamp::Plugin::FeatureSet
c@80 163 process(PiperStubPlugin *plugin,
c@84 164 std::vector<std::vector<float> > inputBuffers,
c@84 165 Vamp::RealTime timestamp) {
c@84 166
c@84 167 if (!m_process) {
c@84 168 throw std::runtime_error("Piper server failed to start");
c@84 169 }
c@84 170
c@84 171 Vamp::HostExt::ProcessRequest request;
c@84 172 request.plugin = plugin;
c@84 173 request.inputBuffers = inputBuffers;
c@84 174 request.timestamp = timestamp;
c@84 175
c@84 176 capnp::MallocMessageBuilder message;
c@84 177 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 178
c@84 179 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@84 180 ReqId id = getId();
c@84 181 builder.getId().setNumber(id);
c@84 182
c@84 183 auto arr = messageToFlatArray(message);
c@84 184 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 185
c@84 186 QByteArray buffer = readResponseBuffer();
c@87 187 auto karr = toKJArray(buffer);
c@87 188 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 189 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 190
c@84 191 //!!! handle (explicit) error case
c@84 192
c@84 193 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
c@84 194
c@84 195 Vamp::HostExt::ProcessResponse pr;
c@84 196 VampnProto::readProcessResponse(pr,
c@84 197 reader.getResponse().getProcess(),
c@84 198 m_mapper);
c@84 199
c@84 200 return pr.features;
c@84 201 }
c@80 202
c@80 203 virtual Vamp::Plugin::FeatureSet
c@84 204 finish(PiperStubPlugin *plugin) {
c@84 205
c@84 206 if (!m_process) {
c@84 207 throw std::runtime_error("Piper server failed to start");
c@84 208 }
c@84 209
c@84 210 Vamp::HostExt::FinishRequest request;
c@84 211 request.plugin = plugin;
c@84 212
c@84 213 capnp::MallocMessageBuilder message;
c@84 214 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 215
c@84 216 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@84 217 ReqId id = getId();
c@84 218 builder.getId().setNumber(id);
c@84 219
c@84 220 auto arr = messageToFlatArray(message);
c@84 221 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 222
c@84 223 QByteArray buffer = readResponseBuffer();
c@87 224 auto karr = toKJArray(buffer);
c@87 225 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 226 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 227
c@84 228 //!!! handle (explicit) error case
c@84 229
c@84 230 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
c@84 231
c@84 232 Vamp::HostExt::ProcessResponse pr;
c@84 233 VampnProto::readFinishResponse(pr,
c@84 234 reader.getResponse().getFinish(),
c@84 235 m_mapper);
c@84 236
c@84 237 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@88 238
c@88 239 // Don't delete the plugin. It's the plugin that is supposed
c@88 240 // to be calling us here
c@84 241
c@84 242 return pr.features;
c@84 243 }
c@80 244
c@80 245 private:
c@83 246 QProcess *m_process;
c@80 247 AssignedPluginHandleMapper m_mapper;
c@84 248 ReqId getId() {
c@81 249 //!!! todo: mutex
c@81 250 static ReqId m_nextId = 0;
c@81 251 return m_nextId++;
c@81 252 }
c@84 253
c@87 254 kj::Array<capnp::word>
c@87 255 toKJArray(QByteArray qarr) {
c@87 256 // We could do this whole thing with fewer copies, but let's
c@87 257 // see whether it matters first
c@84 258 size_t wordSize = sizeof(capnp::word);
c@87 259 size_t words = qarr.size() / wordSize;
c@87 260 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@87 261 memcpy(karr.begin(), qarr.data(), words * wordSize);
c@87 262 return karr;
c@84 263 }
c@84 264
c@84 265 QByteArray
c@84 266 readResponseBuffer() {
c@84 267
c@84 268 QByteArray buffer;
c@84 269 size_t wordSize = sizeof(capnp::word);
c@84 270 bool complete = false;
c@84 271
c@84 272 while (!complete) {
c@84 273
c@84 274 m_process->waitForReadyRead(1000);
c@84 275 qint64 byteCount = m_process->bytesAvailable();
c@84 276 qint64 wordCount = byteCount / wordSize;
c@84 277
c@84 278 if (!wordCount) {
c@84 279 if (m_process->state() == QProcess::NotRunning) {
c@84 280 cerr << "ERROR: Subprocess exited: Load failed" << endl;
c@84 281 throw std::runtime_error("Piper server exited unexpectedly");
c@84 282 }
c@84 283 } else {
c@84 284 buffer.append(m_process->read(wordCount * wordSize));
c@84 285 size_t haveWords = buffer.size() / wordSize;
c@84 286 size_t expectedWords =
c@87 287 capnp::expectedSizeInWordsFromPrefix(toKJArray(buffer));
c@84 288 if (haveWords >= expectedWords) {
c@84 289 if (haveWords > expectedWords) {
c@84 290 cerr << "WARNING: obtained more data than expected ("
c@84 291 << haveWords << " words, expected " << expectedWords
c@84 292 << ")" << endl;
c@84 293 }
c@84 294 complete = true;
c@84 295 }
c@84 296 }
c@84 297 }
c@87 298 /*
c@85 299 cerr << "buffer = ";
c@85 300 for (int i = 0; i < buffer.size(); ++i) {
c@85 301 if (i % 16 == 0) cerr << "\n";
c@85 302 cerr << int(buffer[i]) << " ";
c@85 303 }
c@85 304 cerr << "\n";
c@87 305 */
c@84 306 return buffer;
c@84 307 }
c@84 308
c@84 309 void
c@84 310 checkResponseType(const RpcResponse::Reader &r,
c@84 311 RpcResponse::Response::Which type,
c@84 312 ReqId id) {
c@84 313
c@84 314 if (r.getResponse().which() != type) {
c@84 315 throw std::runtime_error("Wrong response type");
c@84 316 }
c@84 317 if (ReqId(r.getId().getNumber()) != id) {
c@84 318 throw std::runtime_error("Wrong response id");
c@84 319 }
c@84 320 }
c@80 321 };
c@80 322
c@80 323 }
c@80 324
c@84 325 int main(int, char **)
c@84 326 {
c@84 327 piper::PiperClient client;
c@84 328 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
c@84 329 if (!plugin->initialise(1, 4, 4)) {
c@84 330 cerr << "initialisation failed" << endl;
c@84 331 } else {
c@84 332 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
c@84 333 float *bd = buf.data();
c@84 334 Vamp::Plugin::FeatureSet features = plugin->process
c@84 335 (&bd, Vamp::RealTime::zeroTime);
c@84 336 cerr << "results for output 0:" << endl;
c@84 337 auto fl(features[0]);
c@84 338 for (const auto &f: fl) {
c@84 339 cerr << f.values[0] << endl;
c@84 340 }
c@84 341 }
c@88 342
c@87 343 (void)plugin->getRemainingFeatures();
c@89 344
c@89 345 cerr << "calling reset..." << endl;
c@89 346 plugin->reset();
c@89 347 cerr << "...round 2!" << endl;
c@89 348
c@89 349 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
c@89 350 float *bd = buf.data();
c@89 351 Vamp::Plugin::FeatureSet features = plugin->process
c@89 352 (&bd, Vamp::RealTime::zeroTime);
c@89 353 cerr << "results for output 0:" << endl;
c@89 354 auto fl(features[0]);
c@89 355 for (const auto &f: fl) {
c@89 356 cerr << f.values[0] << endl;
c@89 357 }
c@89 358
c@89 359 (void)plugin->getRemainingFeatures();
c@89 360
c@88 361 delete plugin;
c@87 362 //!!! -- and also implement reset(), which will need to reconstruct internally
c@84 363 }
c@84 364