annotate vamp-client/client.cpp @ 85:1b7c11bc5a88

Debug out
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 12 Oct 2016 12:51:29 +0100
parents db9a6ab618bc
children 7a77a374b6b2
rev   line source
c@78 1
c@78 2 #include "stub.h"
c@78 3
c@80 4 #include "vamp-capnp/VampnProto.h"
c@80 5
c@80 6 #include "vamp-support/AssignedPluginHandleMapper.h"
c@80 7
c@83 8 #include <QProcess>
c@83 9
c@83 10 #include <stdexcept>
c@83 11
c@83 12 using std::cerr;
c@83 13 using std::endl;
c@83 14
c@82 15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so
c@82 16 // we're using pipes and the server is completely synchronous,
c@82 17 // handling only one call at once. Our PiperClient will fire off a
c@82 18 // QProcess and refer to its io device. Each request message is
c@82 19 // serialised into memory using capnp::MallocMessageBuilder and
c@82 20 // shunted into the process pipe; we then wait for some bytes to come
c@82 21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when
c@82 22 // a whole message is available, reading only that amount from the
c@82 23 // device and using FlatArrayMessageReader to convert to a response
c@82 24 // message. If the response message's id does not match the request
c@82 25 // message's, then the server has gone wrong (it should never be
c@82 26 // servicing more than one request at a time).
c@82 27
c@82 28 // Next level: Capnp RPC, but I want to get the first level to work
c@83 29 // first, not least because the server already exists.
c@82 30
c@81 31 namespace piper { //!!! probably something different
c@80 32
c@80 33 class PiperClient : public PiperClientBase
c@80 34 {
c@81 35 // unsigned to avoid undefined behaviour on possible wrap
c@81 36 typedef uint32_t ReqId;
c@81 37
c@80 38 public:
c@83 39 PiperClient() {
c@83 40 m_process = new QProcess();
c@83 41 m_process->setReadChannel(QProcess::StandardOutput);
c@83 42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
c@83 43 m_process->start("../bin/piper-vamp-server"); //!!!
c@83 44 if (!m_process->waitForStarted()) {
c@83 45 cerr << "server failed to start" << endl;
c@83 46 delete m_process;
c@83 47 m_process = 0;
c@83 48 }
c@83 49 }
c@81 50
c@83 51 ~PiperClient() {
c@83 52 if (m_process) {
c@83 53 if (m_process->state() != QProcess::NotRunning) {
c@83 54 m_process->close();
c@83 55 m_process->waitForFinished();
c@83 56 }
c@83 57 delete m_process;
c@83 58 }
c@83 59 }
c@81 60
c@83 61 //!!! obviously, factor out all repetitive guff
c@84 62
c@81 63 Vamp::Plugin *
c@81 64 load(std::string key, float inputSampleRate, int adapterFlags) {
c@81 65
c@83 66 if (!m_process) {
c@83 67 throw std::runtime_error("Piper server failed to start");
c@83 68 }
c@83 69
c@81 70 Vamp::HostExt::LoadRequest request;
c@81 71 request.pluginKey = key;
c@81 72 request.inputSampleRate = inputSampleRate;
c@81 73 request.adapterFlags = adapterFlags;
c@81 74
c@84 75 capnp::MallocMessageBuilder message;
c@81 76 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@81 77
c@81 78 VampnProto::buildRpcRequest_Load(builder, request);
c@81 79 ReqId id = getId();
c@81 80 builder.getId().setNumber(id);
c@83 81
c@85 82 cerr << "id = " << id << endl;
c@85 83
c@83 84 auto arr = messageToFlatArray(message);
c@83 85 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@83 86
c@84 87 //!!! ... --> will also need some way to kill this process
c@84 88 //!!! (from another thread)
c@84 89
c@84 90 QByteArray buffer = readResponseBuffer();
c@84 91 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 92 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 93
c@84 94 //!!! handle (explicit) error case
c@84 95
c@84 96 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
c@84 97
c@84 98 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@84 99
c@84 100 Vamp::HostExt::PluginStaticData psd;
c@84 101 Vamp::HostExt::PluginConfiguration defaultConfig;
c@84 102 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@84 103 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@84 104
c@84 105 Vamp::Plugin *plugin = new PiperStubPlugin(this,
c@84 106 inputSampleRate,
c@84 107 psd,
c@84 108 defaultConfig);
c@84 109
c@84 110 m_mapper.addPlugin(lr.getHandle(), plugin);
c@84 111
c@84 112 return plugin;
c@81 113 };
c@80 114
c@80 115 virtual
c@80 116 Vamp::Plugin::OutputList
c@80 117 configure(PiperStubPlugin *plugin,
c@80 118 Vamp::HostExt::PluginConfiguration config) {
c@80 119
c@83 120 if (!m_process) {
c@83 121 throw std::runtime_error("Piper server failed to start");
c@83 122 }
c@83 123
c@80 124 Vamp::HostExt::ConfigurationRequest request;
c@80 125 request.plugin = plugin;
c@80 126 request.configuration = config;
c@80 127
c@84 128 capnp::MallocMessageBuilder message;
c@80 129 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@80 130
c@80 131 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@81 132 ReqId id = getId();
c@81 133 builder.getId().setNumber(id);
c@84 134
c@84 135 auto arr = messageToFlatArray(message);
c@84 136 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 137
c@84 138 QByteArray buffer = readResponseBuffer();
c@84 139 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 140 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@80 141
c@84 142 //!!! handle (explicit) error case
c@84 143
c@84 144 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
c@84 145
c@84 146 Vamp::HostExt::ConfigurationResponse cr;
c@84 147 VampnProto::readConfigurationResponse(cr,
c@84 148 reader.getResponse().getConfigure(),
c@84 149 m_mapper);
c@84 150
c@84 151 return cr.outputs;
c@81 152 };
c@80 153
c@80 154 virtual
c@80 155 Vamp::Plugin::FeatureSet
c@80 156 process(PiperStubPlugin *plugin,
c@84 157 std::vector<std::vector<float> > inputBuffers,
c@84 158 Vamp::RealTime timestamp) {
c@84 159
c@84 160 if (!m_process) {
c@84 161 throw std::runtime_error("Piper server failed to start");
c@84 162 }
c@84 163
c@84 164 Vamp::HostExt::ProcessRequest request;
c@84 165 request.plugin = plugin;
c@84 166 request.inputBuffers = inputBuffers;
c@84 167 request.timestamp = timestamp;
c@84 168
c@84 169 capnp::MallocMessageBuilder message;
c@84 170 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 171
c@84 172 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@84 173 ReqId id = getId();
c@84 174 builder.getId().setNumber(id);
c@84 175
c@84 176 auto arr = messageToFlatArray(message);
c@84 177 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 178
c@84 179 QByteArray buffer = readResponseBuffer();
c@84 180 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 181 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 182
c@84 183 //!!! handle (explicit) error case
c@84 184
c@84 185 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
c@84 186
c@84 187 Vamp::HostExt::ProcessResponse pr;
c@84 188 VampnProto::readProcessResponse(pr,
c@84 189 reader.getResponse().getProcess(),
c@84 190 m_mapper);
c@84 191
c@84 192 return pr.features;
c@84 193 }
c@80 194
c@80 195 virtual Vamp::Plugin::FeatureSet
c@84 196 finish(PiperStubPlugin *plugin) {
c@84 197
c@84 198 if (!m_process) {
c@84 199 throw std::runtime_error("Piper server failed to start");
c@84 200 }
c@84 201
c@84 202 Vamp::HostExt::FinishRequest request;
c@84 203 request.plugin = plugin;
c@84 204
c@84 205 capnp::MallocMessageBuilder message;
c@84 206 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 207
c@84 208 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@84 209 ReqId id = getId();
c@84 210 builder.getId().setNumber(id);
c@84 211
c@84 212 auto arr = messageToFlatArray(message);
c@84 213 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 214
c@84 215 QByteArray buffer = readResponseBuffer();
c@84 216 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 217 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 218
c@84 219 //!!! handle (explicit) error case
c@84 220
c@84 221 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
c@84 222
c@84 223 Vamp::HostExt::ProcessResponse pr;
c@84 224 VampnProto::readFinishResponse(pr,
c@84 225 reader.getResponse().getFinish(),
c@84 226 m_mapper);
c@84 227
c@84 228 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@84 229 delete plugin;
c@84 230
c@84 231 return pr.features;
c@84 232 }
c@80 233
c@80 234 private:
c@83 235 QProcess *m_process;
c@80 236 AssignedPluginHandleMapper m_mapper;
c@84 237 ReqId getId() {
c@81 238 //!!! todo: mutex
c@81 239 static ReqId m_nextId = 0;
c@81 240 return m_nextId++;
c@81 241 }
c@84 242
c@84 243 kj::ArrayPtr<const capnp::word>
c@84 244 toArrayPtr(QByteArray arr) {
c@84 245 size_t wordSize = sizeof(capnp::word);
c@84 246 capnp::word *dptr = reinterpret_cast<capnp::word *>(arr.data());
c@84 247 kj::ArrayPtr<const capnp::word> kptr(dptr, arr.size() / wordSize);
c@84 248 return kptr;
c@84 249 }
c@84 250
c@84 251 QByteArray
c@84 252 readResponseBuffer() {
c@84 253
c@84 254 QByteArray buffer;
c@84 255 size_t wordSize = sizeof(capnp::word);
c@84 256 bool complete = false;
c@84 257
c@84 258 while (!complete) {
c@84 259
c@84 260 m_process->waitForReadyRead(1000);
c@84 261 qint64 byteCount = m_process->bytesAvailable();
c@84 262 qint64 wordCount = byteCount / wordSize;
c@84 263
c@84 264 if (!wordCount) {
c@84 265 if (m_process->state() == QProcess::NotRunning) {
c@84 266 cerr << "ERROR: Subprocess exited: Load failed" << endl;
c@84 267 throw std::runtime_error("Piper server exited unexpectedly");
c@84 268 }
c@84 269 } else {
c@84 270 buffer.append(m_process->read(wordCount * wordSize));
c@84 271 size_t haveWords = buffer.size() / wordSize;
c@84 272 size_t expectedWords =
c@84 273 capnp::expectedSizeInWordsFromPrefix(toArrayPtr(buffer));
c@84 274
c@84 275 cerr << "haveWords = " << haveWords << ", expectedWords = " << expectedWords << endl;
c@84 276
c@84 277 if (haveWords >= expectedWords) {
c@84 278 if (haveWords > expectedWords) {
c@84 279 cerr << "WARNING: obtained more data than expected ("
c@84 280 << haveWords << " words, expected " << expectedWords
c@84 281 << ")" << endl;
c@84 282 }
c@84 283 complete = true;
c@84 284 }
c@84 285 }
c@84 286 }
c@84 287
c@85 288 cerr << "buffer = ";
c@85 289 for (int i = 0; i < buffer.size(); ++i) {
c@85 290 if (i % 16 == 0) cerr << "\n";
c@85 291 cerr << int(buffer[i]) << " ";
c@85 292 }
c@85 293 cerr << "\n";
c@85 294
c@84 295 return buffer;
c@84 296 }
c@84 297
c@84 298 void
c@84 299 checkResponseType(const RpcResponse::Reader &r,
c@84 300 RpcResponse::Response::Which type,
c@84 301 ReqId id) {
c@84 302
c@84 303 if (r.getResponse().which() != type) {
c@84 304 throw std::runtime_error("Wrong response type");
c@84 305 }
c@84 306 if (ReqId(r.getId().getNumber()) != id) {
c@84 307 throw std::runtime_error("Wrong response id");
c@84 308 }
c@84 309 }
c@80 310 };
c@80 311
c@80 312 }
c@80 313
c@84 314 int main(int, char **)
c@84 315 {
c@84 316 piper::PiperClient client;
c@84 317 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
c@84 318 if (!plugin->initialise(1, 4, 4)) {
c@84 319 cerr << "initialisation failed" << endl;
c@84 320 } else {
c@84 321 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
c@84 322 float *bd = buf.data();
c@84 323 Vamp::Plugin::FeatureSet features = plugin->process
c@84 324 (&bd, Vamp::RealTime::zeroTime);
c@84 325 cerr << "results for output 0:" << endl;
c@84 326 auto fl(features[0]);
c@84 327 for (const auto &f: fl) {
c@84 328 cerr << f.values[0] << endl;
c@84 329 }
c@84 330 }
c@84 331 delete plugin;
c@84 332 }
c@84 333