annotate vamp-client/client.cpp @ 87:7a77a374b6b2

Fix decoding error due to misaligned array
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 12 Oct 2016 17:47:59 +0100
parents 1b7c11bc5a88
children bf2e6f939f9f
rev   line source
c@78 1
c@78 2 #include "stub.h"
c@78 3
c@80 4 #include "vamp-capnp/VampnProto.h"
c@80 5
c@80 6 #include "vamp-support/AssignedPluginHandleMapper.h"
c@80 7
c@83 8 #include <QProcess>
c@83 9
c@83 10 #include <stdexcept>
c@83 11
c@83 12 using std::cerr;
c@83 13 using std::endl;
c@83 14
c@82 15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so
c@82 16 // we're using pipes and the server is completely synchronous,
c@82 17 // handling only one call at once. Our PiperClient will fire off a
c@82 18 // QProcess and refer to its io device. Each request message is
c@82 19 // serialised into memory using capnp::MallocMessageBuilder and
c@82 20 // shunted into the process pipe; we then wait for some bytes to come
c@82 21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when
c@82 22 // a whole message is available, reading only that amount from the
c@82 23 // device and using FlatArrayMessageReader to convert to a response
c@82 24 // message. If the response message's id does not match the request
c@82 25 // message's, then the server has gone wrong (it should never be
c@82 26 // servicing more than one request at a time).
c@82 27
c@82 28 // Next level: Capnp RPC, but I want to get the first level to work
c@83 29 // first, not least because the server already exists.
c@82 30
c@81 31 namespace piper { //!!! probably something different
c@80 32
c@80 33 class PiperClient : public PiperClientBase
c@80 34 {
c@81 35 // unsigned to avoid undefined behaviour on possible wrap
c@81 36 typedef uint32_t ReqId;
c@81 37
c@80 38 public:
c@83 39 PiperClient() {
c@83 40 m_process = new QProcess();
c@83 41 m_process->setReadChannel(QProcess::StandardOutput);
c@83 42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
c@83 43 m_process->start("../bin/piper-vamp-server"); //!!!
c@83 44 if (!m_process->waitForStarted()) {
c@83 45 cerr << "server failed to start" << endl;
c@83 46 delete m_process;
c@83 47 m_process = 0;
c@83 48 }
c@83 49 }
c@81 50
c@83 51 ~PiperClient() {
c@83 52 if (m_process) {
c@83 53 if (m_process->state() != QProcess::NotRunning) {
c@83 54 m_process->close();
c@83 55 m_process->waitForFinished();
c@83 56 }
c@83 57 delete m_process;
c@83 58 }
c@83 59 }
c@81 60
c@83 61 //!!! obviously, factor out all repetitive guff
c@84 62
c@81 63 Vamp::Plugin *
c@81 64 load(std::string key, float inputSampleRate, int adapterFlags) {
c@81 65
c@83 66 if (!m_process) {
c@83 67 throw std::runtime_error("Piper server failed to start");
c@83 68 }
c@83 69
c@81 70 Vamp::HostExt::LoadRequest request;
c@81 71 request.pluginKey = key;
c@81 72 request.inputSampleRate = inputSampleRate;
c@81 73 request.adapterFlags = adapterFlags;
c@81 74
c@84 75 capnp::MallocMessageBuilder message;
c@81 76 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@81 77
c@81 78 VampnProto::buildRpcRequest_Load(builder, request);
c@81 79 ReqId id = getId();
c@81 80 builder.getId().setNumber(id);
c@83 81
c@83 82 auto arr = messageToFlatArray(message);
c@83 83 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@83 84
c@84 85 //!!! ... --> will also need some way to kill this process
c@84 86 //!!! (from another thread)
c@84 87
c@84 88 QByteArray buffer = readResponseBuffer();
c@87 89 auto karr = toKJArray(buffer);
c@87 90 capnp::FlatArrayMessageReader responseMessage(karr);
c@87 91 cerr << "made reader" << endl;
c@84 92 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 93
c@84 94 //!!! handle (explicit) error case
c@84 95
c@84 96 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
c@84 97
c@84 98 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@84 99
c@84 100 Vamp::HostExt::PluginStaticData psd;
c@84 101 Vamp::HostExt::PluginConfiguration defaultConfig;
c@84 102 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@84 103 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@84 104
c@84 105 Vamp::Plugin *plugin = new PiperStubPlugin(this,
c@84 106 inputSampleRate,
c@84 107 psd,
c@84 108 defaultConfig);
c@84 109
c@84 110 m_mapper.addPlugin(lr.getHandle(), plugin);
c@84 111
c@84 112 return plugin;
c@81 113 };
c@80 114
c@80 115 virtual
c@80 116 Vamp::Plugin::OutputList
c@80 117 configure(PiperStubPlugin *plugin,
c@80 118 Vamp::HostExt::PluginConfiguration config) {
c@80 119
c@83 120 if (!m_process) {
c@83 121 throw std::runtime_error("Piper server failed to start");
c@83 122 }
c@83 123
c@80 124 Vamp::HostExt::ConfigurationRequest request;
c@80 125 request.plugin = plugin;
c@80 126 request.configuration = config;
c@80 127
c@84 128 capnp::MallocMessageBuilder message;
c@80 129 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@80 130
c@80 131 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@81 132 ReqId id = getId();
c@81 133 builder.getId().setNumber(id);
c@84 134
c@84 135 auto arr = messageToFlatArray(message);
c@84 136 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 137
c@84 138 QByteArray buffer = readResponseBuffer();
c@87 139 auto karr = toKJArray(buffer);
c@87 140 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 141 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@80 142
c@84 143 //!!! handle (explicit) error case
c@84 144
c@84 145 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
c@84 146
c@84 147 Vamp::HostExt::ConfigurationResponse cr;
c@84 148 VampnProto::readConfigurationResponse(cr,
c@84 149 reader.getResponse().getConfigure(),
c@84 150 m_mapper);
c@84 151
c@84 152 return cr.outputs;
c@81 153 };
c@80 154
c@80 155 virtual
c@80 156 Vamp::Plugin::FeatureSet
c@80 157 process(PiperStubPlugin *plugin,
c@84 158 std::vector<std::vector<float> > inputBuffers,
c@84 159 Vamp::RealTime timestamp) {
c@84 160
c@84 161 if (!m_process) {
c@84 162 throw std::runtime_error("Piper server failed to start");
c@84 163 }
c@84 164
c@84 165 Vamp::HostExt::ProcessRequest request;
c@84 166 request.plugin = plugin;
c@84 167 request.inputBuffers = inputBuffers;
c@84 168 request.timestamp = timestamp;
c@84 169
c@84 170 capnp::MallocMessageBuilder message;
c@84 171 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 172
c@84 173 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@84 174 ReqId id = getId();
c@84 175 builder.getId().setNumber(id);
c@84 176
c@84 177 auto arr = messageToFlatArray(message);
c@84 178 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 179
c@84 180 QByteArray buffer = readResponseBuffer();
c@87 181 auto karr = toKJArray(buffer);
c@87 182 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 183 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 184
c@84 185 //!!! handle (explicit) error case
c@84 186
c@84 187 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
c@84 188
c@84 189 Vamp::HostExt::ProcessResponse pr;
c@84 190 VampnProto::readProcessResponse(pr,
c@84 191 reader.getResponse().getProcess(),
c@84 192 m_mapper);
c@84 193
c@84 194 return pr.features;
c@84 195 }
c@80 196
c@80 197 virtual Vamp::Plugin::FeatureSet
c@84 198 finish(PiperStubPlugin *plugin) {
c@84 199
c@84 200 if (!m_process) {
c@84 201 throw std::runtime_error("Piper server failed to start");
c@84 202 }
c@84 203
c@84 204 Vamp::HostExt::FinishRequest request;
c@84 205 request.plugin = plugin;
c@84 206
c@84 207 capnp::MallocMessageBuilder message;
c@84 208 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 209
c@84 210 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@84 211 ReqId id = getId();
c@84 212 builder.getId().setNumber(id);
c@84 213
c@84 214 auto arr = messageToFlatArray(message);
c@84 215 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 216
c@84 217 QByteArray buffer = readResponseBuffer();
c@87 218 auto karr = toKJArray(buffer);
c@87 219 capnp::FlatArrayMessageReader responseMessage(karr);
c@84 220 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 221
c@84 222 //!!! handle (explicit) error case
c@84 223
c@84 224 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
c@84 225
c@84 226 Vamp::HostExt::ProcessResponse pr;
c@84 227 VampnProto::readFinishResponse(pr,
c@84 228 reader.getResponse().getFinish(),
c@84 229 m_mapper);
c@84 230
c@84 231 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@84 232 delete plugin;
c@84 233
c@84 234 return pr.features;
c@84 235 }
c@80 236
c@80 237 private:
c@83 238 QProcess *m_process;
c@80 239 AssignedPluginHandleMapper m_mapper;
c@84 240 ReqId getId() {
c@81 241 //!!! todo: mutex
c@81 242 static ReqId m_nextId = 0;
c@81 243 return m_nextId++;
c@81 244 }
c@84 245
c@87 246 kj::Array<capnp::word>
c@87 247 toKJArray(QByteArray qarr) {
c@87 248 // We could do this whole thing with fewer copies, but let's
c@87 249 // see whether it matters first
c@84 250 size_t wordSize = sizeof(capnp::word);
c@87 251 size_t words = qarr.size() / wordSize;
c@87 252 cerr << "converting " << words << " words (" << (words * wordSize) << " bytes)" << endl;
c@87 253 kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
c@87 254 memcpy(karr.begin(), qarr.data(), words * wordSize);
c@87 255 return karr;
c@84 256 }
c@84 257
c@84 258 QByteArray
c@84 259 readResponseBuffer() {
c@84 260
c@84 261 QByteArray buffer;
c@84 262 size_t wordSize = sizeof(capnp::word);
c@84 263 bool complete = false;
c@84 264
c@84 265 while (!complete) {
c@84 266
c@84 267 m_process->waitForReadyRead(1000);
c@84 268 qint64 byteCount = m_process->bytesAvailable();
c@84 269 qint64 wordCount = byteCount / wordSize;
c@84 270
c@84 271 if (!wordCount) {
c@84 272 if (m_process->state() == QProcess::NotRunning) {
c@84 273 cerr << "ERROR: Subprocess exited: Load failed" << endl;
c@84 274 throw std::runtime_error("Piper server exited unexpectedly");
c@84 275 }
c@84 276 } else {
c@84 277 buffer.append(m_process->read(wordCount * wordSize));
c@84 278 size_t haveWords = buffer.size() / wordSize;
c@84 279 size_t expectedWords =
c@87 280 capnp::expectedSizeInWordsFromPrefix(toKJArray(buffer));
c@84 281
c@84 282 cerr << "haveWords = " << haveWords << ", expectedWords = " << expectedWords << endl;
c@84 283
c@84 284 if (haveWords >= expectedWords) {
c@84 285 if (haveWords > expectedWords) {
c@84 286 cerr << "WARNING: obtained more data than expected ("
c@84 287 << haveWords << " words, expected " << expectedWords
c@84 288 << ")" << endl;
c@84 289 }
c@84 290 complete = true;
c@84 291 }
c@84 292 }
c@84 293 }
c@87 294 /*
c@85 295 cerr << "buffer = ";
c@85 296 for (int i = 0; i < buffer.size(); ++i) {
c@85 297 if (i % 16 == 0) cerr << "\n";
c@85 298 cerr << int(buffer[i]) << " ";
c@85 299 }
c@85 300 cerr << "\n";
c@87 301 */
c@84 302 return buffer;
c@84 303 }
c@84 304
c@84 305 void
c@84 306 checkResponseType(const RpcResponse::Reader &r,
c@84 307 RpcResponse::Response::Which type,
c@84 308 ReqId id) {
c@84 309
c@84 310 if (r.getResponse().which() != type) {
c@84 311 throw std::runtime_error("Wrong response type");
c@84 312 }
c@84 313 if (ReqId(r.getId().getNumber()) != id) {
c@84 314 throw std::runtime_error("Wrong response id");
c@84 315 }
c@84 316 }
c@80 317 };
c@80 318
c@80 319 }
c@80 320
c@84 321 int main(int, char **)
c@84 322 {
c@84 323 piper::PiperClient client;
c@84 324 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
c@84 325 if (!plugin->initialise(1, 4, 4)) {
c@84 326 cerr << "initialisation failed" << endl;
c@84 327 } else {
c@84 328 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
c@84 329 float *bd = buf.data();
c@84 330 Vamp::Plugin::FeatureSet features = plugin->process
c@84 331 (&bd, Vamp::RealTime::zeroTime);
c@84 332 cerr << "results for output 0:" << endl;
c@84 333 auto fl(features[0]);
c@84 334 for (const auto &f: fl) {
c@84 335 cerr << f.values[0] << endl;
c@84 336 }
c@84 337 }
c@87 338 //!!! todo: make it possible to do both of the following --
c@87 339 (void)plugin->getRemainingFeatures();
c@87 340 // delete plugin;
c@87 341 //!!! -- and also implement reset(), which will need to reconstruct internally
c@84 342 }
c@84 343