c@78: c@78: #include "stub.h" c@78: c@80: #include "vamp-capnp/VampnProto.h" c@80: c@80: #include "vamp-support/AssignedPluginHandleMapper.h" c@80: c@83: #include c@83: c@83: #include c@83: c@83: using std::cerr; c@83: using std::endl; c@83: c@82: // First cut plan: this is to be client-qt.cpp, using a QProcess, so c@82: // we're using pipes and the server is completely synchronous, c@82: // handling only one call at once. Our PiperClient will fire off a c@82: // QProcess and refer to its io device. Each request message is c@82: // serialised into memory using capnp::MallocMessageBuilder and c@82: // shunted into the process pipe; we then wait for some bytes to come c@82: // back and use capnp::expectedSizeInWordsFromPrefix to work out when c@82: // a whole message is available, reading only that amount from the c@82: // device and using FlatArrayMessageReader to convert to a response c@82: // message. If the response message's id does not match the request c@82: // message's, then the server has gone wrong (it should never be c@82: // servicing more than one request at a time). c@82: c@82: // Next level: Capnp RPC, but I want to get the first level to work c@83: // first, not least because the server already exists. c@82: c@81: namespace piper { //!!! probably something different c@80: c@80: class PiperClient : public PiperClientBase c@80: { c@81: // unsigned to avoid undefined behaviour on possible wrap c@81: typedef uint32_t ReqId; c@81: c@80: public: c@83: PiperClient() { c@83: m_process = new QProcess(); c@83: m_process->setReadChannel(QProcess::StandardOutput); c@83: m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel); c@83: m_process->start("../bin/piper-vamp-server"); //!!! c@83: if (!m_process->waitForStarted()) { c@83: cerr << "server failed to start" << endl; c@83: delete m_process; c@83: m_process = 0; c@83: } c@83: } c@81: c@83: ~PiperClient() { c@83: if (m_process) { c@83: if (m_process->state() != QProcess::NotRunning) { c@83: m_process->close(); c@83: m_process->waitForFinished(); c@83: } c@83: delete m_process; c@83: } c@83: } c@81: c@83: //!!! obviously, factor out all repetitive guff c@84: c@81: Vamp::Plugin * c@81: load(std::string key, float inputSampleRate, int adapterFlags) { c@81: c@83: if (!m_process) { c@83: throw std::runtime_error("Piper server failed to start"); c@83: } c@83: c@81: Vamp::HostExt::LoadRequest request; c@81: request.pluginKey = key; c@81: request.inputSampleRate = inputSampleRate; c@81: request.adapterFlags = adapterFlags; c@81: c@84: capnp::MallocMessageBuilder message; c@81: RpcRequest::Builder builder = message.initRoot(); c@81: c@81: VampnProto::buildRpcRequest_Load(builder, request); c@81: ReqId id = getId(); c@81: builder.getId().setNumber(id); c@83: c@83: auto arr = messageToFlatArray(message); c@83: m_process->write(arr.asChars().begin(), arr.asChars().size()); c@83: c@84: //!!! ... --> will also need some way to kill this process c@84: //!!! (from another thread) c@84: c@84: QByteArray buffer = readResponseBuffer(); c@84: capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer)); c@84: RpcResponse::Reader reader = responseMessage.getRoot(); c@84: c@84: //!!! handle (explicit) error case c@84: c@84: checkResponseType(reader, RpcResponse::Response::Which::LOAD, id); c@84: c@84: const LoadResponse::Reader &lr = reader.getResponse().getLoad(); c@84: c@84: Vamp::HostExt::PluginStaticData psd; c@84: Vamp::HostExt::PluginConfiguration defaultConfig; c@84: VampnProto::readExtractorStaticData(psd, lr.getStaticData()); c@84: VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration()); c@84: c@84: Vamp::Plugin *plugin = new PiperStubPlugin(this, c@84: inputSampleRate, c@84: psd, c@84: defaultConfig); c@84: c@84: m_mapper.addPlugin(lr.getHandle(), plugin); c@84: c@84: return plugin; c@81: }; c@80: c@80: virtual c@80: Vamp::Plugin::OutputList c@80: configure(PiperStubPlugin *plugin, c@80: Vamp::HostExt::PluginConfiguration config) { c@80: c@83: if (!m_process) { c@83: throw std::runtime_error("Piper server failed to start"); c@83: } c@83: c@80: Vamp::HostExt::ConfigurationRequest request; c@80: request.plugin = plugin; c@80: request.configuration = config; c@80: c@84: capnp::MallocMessageBuilder message; c@80: RpcRequest::Builder builder = message.initRoot(); c@80: c@80: VampnProto::buildRpcRequest_Configure(builder, request, m_mapper); c@81: ReqId id = getId(); c@81: builder.getId().setNumber(id); c@84: c@84: auto arr = messageToFlatArray(message); c@84: m_process->write(arr.asChars().begin(), arr.asChars().size()); c@84: c@84: QByteArray buffer = readResponseBuffer(); c@84: capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer)); c@84: RpcResponse::Reader reader = responseMessage.getRoot(); c@80: c@84: //!!! handle (explicit) error case c@84: c@84: checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id); c@84: c@84: Vamp::HostExt::ConfigurationResponse cr; c@84: VampnProto::readConfigurationResponse(cr, c@84: reader.getResponse().getConfigure(), c@84: m_mapper); c@84: c@84: return cr.outputs; c@81: }; c@80: c@80: virtual c@80: Vamp::Plugin::FeatureSet c@80: process(PiperStubPlugin *plugin, c@84: std::vector > inputBuffers, c@84: Vamp::RealTime timestamp) { c@84: c@84: if (!m_process) { c@84: throw std::runtime_error("Piper server failed to start"); c@84: } c@84: c@84: Vamp::HostExt::ProcessRequest request; c@84: request.plugin = plugin; c@84: request.inputBuffers = inputBuffers; c@84: request.timestamp = timestamp; c@84: c@84: capnp::MallocMessageBuilder message; c@84: RpcRequest::Builder builder = message.initRoot(); c@84: c@84: VampnProto::buildRpcRequest_Process(builder, request, m_mapper); c@84: ReqId id = getId(); c@84: builder.getId().setNumber(id); c@84: c@84: auto arr = messageToFlatArray(message); c@84: m_process->write(arr.asChars().begin(), arr.asChars().size()); c@84: c@84: QByteArray buffer = readResponseBuffer(); c@84: capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer)); c@84: RpcResponse::Reader reader = responseMessage.getRoot(); c@84: c@84: //!!! handle (explicit) error case c@84: c@84: checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id); c@84: c@84: Vamp::HostExt::ProcessResponse pr; c@84: VampnProto::readProcessResponse(pr, c@84: reader.getResponse().getProcess(), c@84: m_mapper); c@84: c@84: return pr.features; c@84: } c@80: c@80: virtual Vamp::Plugin::FeatureSet c@84: finish(PiperStubPlugin *plugin) { c@84: c@84: if (!m_process) { c@84: throw std::runtime_error("Piper server failed to start"); c@84: } c@84: c@84: Vamp::HostExt::FinishRequest request; c@84: request.plugin = plugin; c@84: c@84: capnp::MallocMessageBuilder message; c@84: RpcRequest::Builder builder = message.initRoot(); c@84: c@84: VampnProto::buildRpcRequest_Finish(builder, request, m_mapper); c@84: ReqId id = getId(); c@84: builder.getId().setNumber(id); c@84: c@84: auto arr = messageToFlatArray(message); c@84: m_process->write(arr.asChars().begin(), arr.asChars().size()); c@84: c@84: QByteArray buffer = readResponseBuffer(); c@84: capnp::FlatArrayMessageReader responseMessage(toArrayPtr(buffer)); c@84: RpcResponse::Reader reader = responseMessage.getRoot(); c@84: c@84: //!!! handle (explicit) error case c@84: c@84: checkResponseType(reader, RpcResponse::Response::Which::FINISH, id); c@84: c@84: Vamp::HostExt::ProcessResponse pr; c@84: VampnProto::readFinishResponse(pr, c@84: reader.getResponse().getFinish(), c@84: m_mapper); c@84: c@84: m_mapper.removePlugin(m_mapper.pluginToHandle(plugin)); c@84: delete plugin; c@84: c@84: return pr.features; c@84: } c@80: c@80: private: c@83: QProcess *m_process; c@80: AssignedPluginHandleMapper m_mapper; c@84: ReqId getId() { c@81: //!!! todo: mutex c@81: static ReqId m_nextId = 0; c@81: return m_nextId++; c@81: } c@84: c@84: kj::ArrayPtr c@84: toArrayPtr(QByteArray arr) { c@84: size_t wordSize = sizeof(capnp::word); c@84: capnp::word *dptr = reinterpret_cast(arr.data()); c@84: kj::ArrayPtr kptr(dptr, arr.size() / wordSize); c@84: return kptr; c@84: } c@84: c@84: QByteArray c@84: readResponseBuffer() { c@84: c@84: QByteArray buffer; c@84: size_t wordSize = sizeof(capnp::word); c@84: bool complete = false; c@84: c@84: while (!complete) { c@84: c@84: m_process->waitForReadyRead(1000); c@84: qint64 byteCount = m_process->bytesAvailable(); c@84: qint64 wordCount = byteCount / wordSize; c@84: c@84: if (!wordCount) { c@84: if (m_process->state() == QProcess::NotRunning) { c@84: cerr << "ERROR: Subprocess exited: Load failed" << endl; c@84: throw std::runtime_error("Piper server exited unexpectedly"); c@84: } c@84: } else { c@84: buffer.append(m_process->read(wordCount * wordSize)); c@84: size_t haveWords = buffer.size() / wordSize; c@84: size_t expectedWords = c@84: capnp::expectedSizeInWordsFromPrefix(toArrayPtr(buffer)); c@84: c@84: cerr << "haveWords = " << haveWords << ", expectedWords = " << expectedWords << endl; c@84: c@84: if (haveWords >= expectedWords) { c@84: if (haveWords > expectedWords) { c@84: cerr << "WARNING: obtained more data than expected (" c@84: << haveWords << " words, expected " << expectedWords c@84: << ")" << endl; c@84: } c@84: complete = true; c@84: } c@84: } c@84: } c@84: c@84: return buffer; c@84: } c@84: c@84: void c@84: checkResponseType(const RpcResponse::Reader &r, c@84: RpcResponse::Response::Which type, c@84: ReqId id) { c@84: c@84: if (r.getResponse().which() != type) { c@84: throw std::runtime_error("Wrong response type"); c@84: } c@84: if (ReqId(r.getId().getNumber()) != id) { c@84: throw std::runtime_error("Wrong response id"); c@84: } c@84: } c@80: }; c@80: c@80: } c@80: c@84: int main(int, char **) c@84: { c@84: piper::PiperClient client; c@84: Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0); c@84: if (!plugin->initialise(1, 4, 4)) { c@84: cerr << "initialisation failed" << endl; c@84: } else { c@84: std::vector buf = { 1.0, -1.0, 1.0, -1.0 }; c@84: float *bd = buf.data(); c@84: Vamp::Plugin::FeatureSet features = plugin->process c@84: (&bd, Vamp::RealTime::zeroTime); c@84: cerr << "results for output 0:" << endl; c@84: auto fl(features[0]); c@84: for (const auto &f: fl) { c@84: cerr << f.values[0] << endl; c@84: } c@84: } c@84: delete plugin; c@84: } c@84: