annotate vamp-client/client.cpp @ 84:db9a6ab618bc

Client builds; does not run
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 12 Oct 2016 11:59:57 +0100
parents 154e94ea84d4
children 1b7c11bc5a88
rev   line source
c@78 1
c@78 2 #include "stub.h"
c@78 3
c@80 4 #include "vamp-capnp/VampnProto.h"
c@80 5
c@80 6 #include "vamp-support/AssignedPluginHandleMapper.h"
c@80 7
c@83 8 #include <QProcess>
c@83 9
c@83 10 #include <stdexcept>
c@83 11
c@83 12 using std::cerr;
c@83 13 using std::endl;
c@83 14
c@82 15 // First cut plan: this is to be client-qt.cpp, using a QProcess, so
c@82 16 // we're using pipes and the server is completely synchronous,
c@82 17 // handling only one call at once. Our PiperClient will fire off a
c@82 18 // QProcess and refer to its io device. Each request message is
c@82 19 // serialised into memory using capnp::MallocMessageBuilder and
c@82 20 // shunted into the process pipe; we then wait for some bytes to come
c@82 21 // back and use capnp::expectedSizeInWordsFromPrefix to work out when
c@82 22 // a whole message is available, reading only that amount from the
c@82 23 // device and using FlatArrayMessageReader to convert to a response
c@82 24 // message. If the response message's id does not match the request
c@82 25 // message's, then the server has gone wrong (it should never be
c@82 26 // servicing more than one request at a time).
c@82 27
c@82 28 // Next level: Capnp RPC, but I want to get the first level to work
c@83 29 // first, not least because the server already exists.
c@82 30
c@81 31 namespace piper { //!!! probably something different
c@80 32
c@80 33 class PiperClient : public PiperClientBase
c@80 34 {
c@81 35 // unsigned to avoid undefined behaviour on possible wrap
c@81 36 typedef uint32_t ReqId;
c@81 37
c@80 38 public:
c@83 39 PiperClient() {
c@83 40 m_process = new QProcess();
c@83 41 m_process->setReadChannel(QProcess::StandardOutput);
c@83 42 m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
c@83 43 m_process->start("../bin/piper-vamp-server"); //!!!
c@83 44 if (!m_process->waitForStarted()) {
c@83 45 cerr << "server failed to start" << endl;
c@83 46 delete m_process;
c@83 47 m_process = 0;
c@83 48 }
c@83 49 }
c@81 50
c@83 51 ~PiperClient() {
c@83 52 if (m_process) {
c@83 53 if (m_process->state() != QProcess::NotRunning) {
c@83 54 m_process->close();
c@83 55 m_process->waitForFinished();
c@83 56 }
c@83 57 delete m_process;
c@83 58 }
c@83 59 }
c@81 60
c@83 61 //!!! obviously, factor out all repetitive guff
c@84 62
c@81 63 Vamp::Plugin *
c@81 64 load(std::string key, float inputSampleRate, int adapterFlags) {
c@81 65
c@83 66 if (!m_process) {
c@83 67 throw std::runtime_error("Piper server failed to start");
c@83 68 }
c@83 69
c@81 70 Vamp::HostExt::LoadRequest request;
c@81 71 request.pluginKey = key;
c@81 72 request.inputSampleRate = inputSampleRate;
c@81 73 request.adapterFlags = adapterFlags;
c@81 74
c@84 75 capnp::MallocMessageBuilder message;
c@81 76 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@81 77
c@81 78 VampnProto::buildRpcRequest_Load(builder, request);
c@81 79 ReqId id = getId();
c@81 80 builder.getId().setNumber(id);
c@83 81
c@83 82 auto arr = messageToFlatArray(message);
c@83 83 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@83 84
c@84 85 //!!! ... --> will also need some way to kill this process
c@84 86 //!!! (from another thread)
c@84 87
c@84 88 QByteArray buffer = readResponseBuffer();
c@84 89 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 90 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 91
c@84 92 //!!! handle (explicit) error case
c@84 93
c@84 94 checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
c@84 95
c@84 96 const LoadResponse::Reader &lr = reader.getResponse().getLoad();
c@84 97
c@84 98 Vamp::HostExt::PluginStaticData psd;
c@84 99 Vamp::HostExt::PluginConfiguration defaultConfig;
c@84 100 VampnProto::readExtractorStaticData(psd, lr.getStaticData());
c@84 101 VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
c@84 102
c@84 103 Vamp::Plugin *plugin = new PiperStubPlugin(this,
c@84 104 inputSampleRate,
c@84 105 psd,
c@84 106 defaultConfig);
c@84 107
c@84 108 m_mapper.addPlugin(lr.getHandle(), plugin);
c@84 109
c@84 110 return plugin;
c@81 111 };
c@80 112
c@80 113 virtual
c@80 114 Vamp::Plugin::OutputList
c@80 115 configure(PiperStubPlugin *plugin,
c@80 116 Vamp::HostExt::PluginConfiguration config) {
c@80 117
c@83 118 if (!m_process) {
c@83 119 throw std::runtime_error("Piper server failed to start");
c@83 120 }
c@83 121
c@80 122 Vamp::HostExt::ConfigurationRequest request;
c@80 123 request.plugin = plugin;
c@80 124 request.configuration = config;
c@80 125
c@84 126 capnp::MallocMessageBuilder message;
c@80 127 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@80 128
c@80 129 VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
c@81 130 ReqId id = getId();
c@81 131 builder.getId().setNumber(id);
c@84 132
c@84 133 auto arr = messageToFlatArray(message);
c@84 134 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 135
c@84 136 QByteArray buffer = readResponseBuffer();
c@84 137 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 138 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@80 139
c@84 140 //!!! handle (explicit) error case
c@84 141
c@84 142 checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
c@84 143
c@84 144 Vamp::HostExt::ConfigurationResponse cr;
c@84 145 VampnProto::readConfigurationResponse(cr,
c@84 146 reader.getResponse().getConfigure(),
c@84 147 m_mapper);
c@84 148
c@84 149 return cr.outputs;
c@81 150 };
c@80 151
c@80 152 virtual
c@80 153 Vamp::Plugin::FeatureSet
c@80 154 process(PiperStubPlugin *plugin,
c@84 155 std::vector<std::vector<float> > inputBuffers,
c@84 156 Vamp::RealTime timestamp) {
c@84 157
c@84 158 if (!m_process) {
c@84 159 throw std::runtime_error("Piper server failed to start");
c@84 160 }
c@84 161
c@84 162 Vamp::HostExt::ProcessRequest request;
c@84 163 request.plugin = plugin;
c@84 164 request.inputBuffers = inputBuffers;
c@84 165 request.timestamp = timestamp;
c@84 166
c@84 167 capnp::MallocMessageBuilder message;
c@84 168 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 169
c@84 170 VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
c@84 171 ReqId id = getId();
c@84 172 builder.getId().setNumber(id);
c@84 173
c@84 174 auto arr = messageToFlatArray(message);
c@84 175 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 176
c@84 177 QByteArray buffer = readResponseBuffer();
c@84 178 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 179 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 180
c@84 181 //!!! handle (explicit) error case
c@84 182
c@84 183 checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
c@84 184
c@84 185 Vamp::HostExt::ProcessResponse pr;
c@84 186 VampnProto::readProcessResponse(pr,
c@84 187 reader.getResponse().getProcess(),
c@84 188 m_mapper);
c@84 189
c@84 190 return pr.features;
c@84 191 }
c@80 192
c@80 193 virtual Vamp::Plugin::FeatureSet
c@84 194 finish(PiperStubPlugin *plugin) {
c@84 195
c@84 196 if (!m_process) {
c@84 197 throw std::runtime_error("Piper server failed to start");
c@84 198 }
c@84 199
c@84 200 Vamp::HostExt::FinishRequest request;
c@84 201 request.plugin = plugin;
c@84 202
c@84 203 capnp::MallocMessageBuilder message;
c@84 204 RpcRequest::Builder builder = message.initRoot<RpcRequest>();
c@84 205
c@84 206 VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
c@84 207 ReqId id = getId();
c@84 208 builder.getId().setNumber(id);
c@84 209
c@84 210 auto arr = messageToFlatArray(message);
c@84 211 m_process->write(arr.asChars().begin(), arr.asChars().size());
c@84 212
c@84 213 QByteArray buffer = readResponseBuffer();
c@84 214 capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer));
c@84 215 RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
c@84 216
c@84 217 //!!! handle (explicit) error case
c@84 218
c@84 219 checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
c@84 220
c@84 221 Vamp::HostExt::ProcessResponse pr;
c@84 222 VampnProto::readFinishResponse(pr,
c@84 223 reader.getResponse().getFinish(),
c@84 224 m_mapper);
c@84 225
c@84 226 m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
c@84 227 delete plugin;
c@84 228
c@84 229 return pr.features;
c@84 230 }
c@80 231
c@80 232 private:
c@83 233 QProcess *m_process;
c@80 234 AssignedPluginHandleMapper m_mapper;
c@84 235 ReqId getId() {
c@81 236 //!!! todo: mutex
c@81 237 static ReqId m_nextId = 0;
c@81 238 return m_nextId++;
c@81 239 }
c@84 240
c@84 241 kj::ArrayPtr<const capnp::word>
c@84 242 toArrayPtr(QByteArray arr) {
c@84 243 size_t wordSize = sizeof(capnp::word);
c@84 244 capnp::word *dptr = reinterpret_cast<capnp::word *>(arr.data());
c@84 245 kj::ArrayPtr<const capnp::word> kptr(dptr, arr.size() / wordSize);
c@84 246 return kptr;
c@84 247 }
c@84 248
c@84 249 QByteArray
c@84 250 readResponseBuffer() {
c@84 251
c@84 252 QByteArray buffer;
c@84 253 size_t wordSize = sizeof(capnp::word);
c@84 254 bool complete = false;
c@84 255
c@84 256 while (!complete) {
c@84 257
c@84 258 m_process->waitForReadyRead(1000);
c@84 259 qint64 byteCount = m_process->bytesAvailable();
c@84 260 qint64 wordCount = byteCount / wordSize;
c@84 261
c@84 262 if (!wordCount) {
c@84 263 if (m_process->state() == QProcess::NotRunning) {
c@84 264 cerr << "ERROR: Subprocess exited: Load failed" << endl;
c@84 265 throw std::runtime_error("Piper server exited unexpectedly");
c@84 266 }
c@84 267 } else {
c@84 268 buffer.append(m_process->read(wordCount * wordSize));
c@84 269 size_t haveWords = buffer.size() / wordSize;
c@84 270 size_t expectedWords =
c@84 271 capnp::expectedSizeInWordsFromPrefix(toArrayPtr(buffer));
c@84 272
c@84 273 cerr << "haveWords = " << haveWords << ", expectedWords = " << expectedWords << endl;
c@84 274
c@84 275 if (haveWords >= expectedWords) {
c@84 276 if (haveWords > expectedWords) {
c@84 277 cerr << "WARNING: obtained more data than expected ("
c@84 278 << haveWords << " words, expected " << expectedWords
c@84 279 << ")" << endl;
c@84 280 }
c@84 281 complete = true;
c@84 282 }
c@84 283 }
c@84 284 }
c@84 285
c@84 286 return buffer;
c@84 287 }
c@84 288
c@84 289 void
c@84 290 checkResponseType(const RpcResponse::Reader &r,
c@84 291 RpcResponse::Response::Which type,
c@84 292 ReqId id) {
c@84 293
c@84 294 if (r.getResponse().which() != type) {
c@84 295 throw std::runtime_error("Wrong response type");
c@84 296 }
c@84 297 if (ReqId(r.getId().getNumber()) != id) {
c@84 298 throw std::runtime_error("Wrong response id");
c@84 299 }
c@84 300 }
c@80 301 };
c@80 302
c@80 303 }
c@80 304
c@84 305 int main(int, char **)
c@84 306 {
c@84 307 piper::PiperClient client;
c@84 308 Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
c@84 309 if (!plugin->initialise(1, 4, 4)) {
c@84 310 cerr << "initialisation failed" << endl;
c@84 311 } else {
c@84 312 std::vector<float> buf = { 1.0, -1.0, 1.0, -1.0 };
c@84 313 float *bd = buf.data();
c@84 314 Vamp::Plugin::FeatureSet features = plugin->process
c@84 315 (&bd, Vamp::RealTime::zeroTime);
c@84 316 cerr << "results for output 0:" << endl;
c@84 317 auto fl(features[0]);
c@84 318 for (const auto &f: fl) {
c@84 319 cerr << f.values[0] << endl;
c@84 320 }
c@84 321 }
c@84 322 delete plugin;
c@84 323 }
c@84 324