diff vamp-client/client.cpp @ 90:6429a99abcad

Split out classes
author Chris Cannam <c.cannam@qmul.ac.uk>
date Thu, 13 Oct 2016 10:17:59 +0100
parents 03ed2e0a6c8f
children 21f8af53eaf0
line wrap: on
line diff
--- a/vamp-client/client.cpp	Wed Oct 12 21:34:21 2016 +0100
+++ b/vamp-client/client.cpp	Thu Oct 13 10:17:59 2016 +0100
@@ -1,330 +1,20 @@
 
-#include "stub.h"
-
-#include "vamp-capnp/VampnProto.h"
-
-#include "vamp-support/AssignedPluginHandleMapper.h"
-
-#include <QProcess>
+#include "PiperStubPlugin.h"
+#include "CapnpMessageCompletenessChecker.h"
+#include "PipedQProcessTransport.h"
+#include "PiperCapnpClient.h"
 
 #include <stdexcept>
 
 using std::cerr;
 using std::endl;
 
-// First cut plan: this is to be client-qt.cpp, using a QProcess, so
-// we're using pipes and the server is completely synchronous,
-// handling only one call at once. Our PiperClient will fire off a
-// QProcess and refer to its io device. Each request message is
-// serialised into memory using capnp::MallocMessageBuilder and
-// shunted into the process pipe; we then wait for some bytes to come
-// back and use capnp::expectedSizeInWordsFromPrefix to work out when
-// a whole message is available, reading only that amount from the
-// device and using FlatArrayMessageReader to convert to a response
-// message. If the response message's id does not match the request
-// message's, then the server has gone wrong (it should never be
-// servicing more than one request at a time).
-
-// Next level: Capnp RPC, but I want to get the first level to work
-// first, not least because the server already exists.
-
-namespace piper { //!!! probably something different
-
-class PiperClient : public PiperClientStubRequirements
-{
-    // unsigned to avoid undefined behaviour on possible wrap
-    typedef uint32_t ReqId;
-    
-public:
-    PiperClient() {
-        m_process = new QProcess();
-        m_process->setReadChannel(QProcess::StandardOutput);
-        m_process->setProcessChannelMode(QProcess::ForwardedErrorChannel);
-        m_process->start("../bin/piper-vamp-server"); //!!!
-        if (!m_process->waitForStarted()) {
-            cerr << "server failed to start" << endl;
-            delete m_process;
-            m_process = 0;
-        }
-    }
-
-    ~PiperClient() {
-        if (m_process) {
-            if (m_process->state() != QProcess::NotRunning) {
-		m_process->closeWriteChannel();
-                m_process->waitForFinished(200);
-                m_process->close();
-                m_process->waitForFinished();
-		cerr << "server exited" << endl;
-            }
-            delete m_process;
-        }
-    }
-
-    //!!! obviously, factor out all repetitive guff
-
-    //!!! list and load are supposed to be called by application code,
-    //!!! but the rest are only supposed to be called by the plugin --
-    //!!! sort out the api here
-    
-    Vamp::Plugin *
-    load(std::string key, float inputSampleRate, int adapterFlags) {
-
-        if (!m_process) {
-            throw std::runtime_error("Piper server failed to start");
-        }
-
-        Vamp::HostExt::LoadRequest request;
-        request.pluginKey = key;
-        request.inputSampleRate = inputSampleRate;
-        request.adapterFlags = adapterFlags;
-
-        capnp::MallocMessageBuilder message;
-        RpcRequest::Builder builder = message.initRoot<RpcRequest>();
-
-        VampnProto::buildRpcRequest_Load(builder, request);
-        ReqId id = getId();
-        builder.getId().setNumber(id);
-
-        auto arr = messageToFlatArray(message);
-        m_process->write(arr.asChars().begin(), arr.asChars().size());
-
-        //!!! ... --> will also need some way to kill this process
-        //!!! (from another thread)
-
-        QByteArray buffer = readResponseBuffer();
-	auto karr = toKJArray(buffer);
-        capnp::FlatArrayMessageReader responseMessage(karr);
-        RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
-
-        //!!! handle (explicit) error case
-
-        checkResponseType(reader, RpcResponse::Response::Which::LOAD, id);
-        
-        const LoadResponse::Reader &lr = reader.getResponse().getLoad();
-
-        Vamp::HostExt::PluginStaticData psd;
-        Vamp::HostExt::PluginConfiguration defaultConfig;
-        VampnProto::readExtractorStaticData(psd, lr.getStaticData());
-        VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration());
-        
-        Vamp::Plugin *plugin = new PiperStubPlugin(this,
-                                                   inputSampleRate,
-                                                   psd,
-                                                   defaultConfig);
-
-        m_mapper.addPlugin(lr.getHandle(), plugin);
-
-        return plugin;
-    };     
-    
-    virtual
-    Vamp::Plugin::OutputList
-    configure(PiperStubPlugin *plugin,
-              Vamp::HostExt::PluginConfiguration config) {
-
-        if (!m_process) {
-            throw std::runtime_error("Piper server failed to start");
-        }
-
-        Vamp::HostExt::ConfigurationRequest request;
-        request.plugin = plugin;
-        request.configuration = config;
-
-        capnp::MallocMessageBuilder message;
-        RpcRequest::Builder builder = message.initRoot<RpcRequest>();
-
-        VampnProto::buildRpcRequest_Configure(builder, request, m_mapper);
-        ReqId id = getId();
-        builder.getId().setNumber(id);
-        
-        auto arr = messageToFlatArray(message);
-        m_process->write(arr.asChars().begin(), arr.asChars().size());
-        
-        QByteArray buffer = readResponseBuffer();
-	auto karr = toKJArray(buffer);
-        capnp::FlatArrayMessageReader responseMessage(karr);
-        RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
-
-        //!!! handle (explicit) error case
-
-        checkResponseType(reader, RpcResponse::Response::Which::CONFIGURE, id);
-
-        Vamp::HostExt::ConfigurationResponse cr;
-        VampnProto::readConfigurationResponse(cr,
-                                              reader.getResponse().getConfigure(),
-                                              m_mapper);
-
-        return cr.outputs;
-    };
-    
-    virtual
-    Vamp::Plugin::FeatureSet
-    process(PiperStubPlugin *plugin,
-            std::vector<std::vector<float> > inputBuffers,
-            Vamp::RealTime timestamp) {
-
-        if (!m_process) {
-            throw std::runtime_error("Piper server failed to start");
-        }
-
-        Vamp::HostExt::ProcessRequest request;
-        request.plugin = plugin;
-        request.inputBuffers = inputBuffers;
-        request.timestamp = timestamp;
-        
-        capnp::MallocMessageBuilder message;
-        RpcRequest::Builder builder = message.initRoot<RpcRequest>();
-
-        VampnProto::buildRpcRequest_Process(builder, request, m_mapper);
-        ReqId id = getId();
-        builder.getId().setNumber(id);
-        
-        auto arr = messageToFlatArray(message);
-        m_process->write(arr.asChars().begin(), arr.asChars().size());
-        
-        QByteArray buffer = readResponseBuffer();
-	auto karr = toKJArray(buffer);
-        capnp::FlatArrayMessageReader responseMessage(karr);
-        RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
-
-        //!!! handle (explicit) error case
-
-        checkResponseType(reader, RpcResponse::Response::Which::PROCESS, id);
-
-        Vamp::HostExt::ProcessResponse pr;
-        VampnProto::readProcessResponse(pr,
-                                        reader.getResponse().getProcess(),
-                                        m_mapper);
-
-        return pr.features;
-    }
-
-    virtual Vamp::Plugin::FeatureSet
-    finish(PiperStubPlugin *plugin) {
-
-        if (!m_process) {
-            throw std::runtime_error("Piper server failed to start");
-        }
-
-        Vamp::HostExt::FinishRequest request;
-        request.plugin = plugin;
-        
-        capnp::MallocMessageBuilder message;
-        RpcRequest::Builder builder = message.initRoot<RpcRequest>();
-
-        VampnProto::buildRpcRequest_Finish(builder, request, m_mapper);
-        ReqId id = getId();
-        builder.getId().setNumber(id);
-        
-        auto arr = messageToFlatArray(message);
-        m_process->write(arr.asChars().begin(), arr.asChars().size());
-        
-        QByteArray buffer = readResponseBuffer();
-	auto karr = toKJArray(buffer);
-        capnp::FlatArrayMessageReader responseMessage(karr);
-        RpcResponse::Reader reader = responseMessage.getRoot<RpcResponse>();
-
-        //!!! handle (explicit) error case
-
-        checkResponseType(reader, RpcResponse::Response::Which::FINISH, id);
-
-        Vamp::HostExt::ProcessResponse pr;
-        VampnProto::readFinishResponse(pr,
-                                       reader.getResponse().getFinish(),
-                                       m_mapper);
-
-        m_mapper.removePlugin(m_mapper.pluginToHandle(plugin));
-
-	// Don't delete the plugin. It's the plugin that is supposed
-	// to be calling us here
-        
-        return pr.features;
-    }
-
-private:
-    QProcess *m_process;
-    AssignedPluginHandleMapper m_mapper;
-    ReqId getId() {
-        //!!! todo: mutex
-        static ReqId m_nextId = 0;
-        return m_nextId++;
-    }
-
-    kj::Array<capnp::word>
-    toKJArray(QByteArray qarr) {
-	// We could do this whole thing with fewer copies, but let's
-	// see whether it matters first
-        size_t wordSize = sizeof(capnp::word);
-	size_t words = qarr.size() / wordSize;
-	kj::Array<capnp::word> karr(kj::heapArray<capnp::word>(words));
-	memcpy(karr.begin(), qarr.data(), words * wordSize);
-	return karr;
-    }
-
-    QByteArray
-    readResponseBuffer() { 
-        
-        QByteArray buffer;
-        size_t wordSize = sizeof(capnp::word);
-        bool complete = false;
-        
-        while (!complete) {
-
-            m_process->waitForReadyRead(1000);
-            qint64 byteCount = m_process->bytesAvailable();
-            qint64 wordCount = byteCount / wordSize;
-
-            if (!wordCount) {
-                if (m_process->state() == QProcess::NotRunning) {
-                    cerr << "ERROR: Subprocess exited: Load failed" << endl;
-                    throw std::runtime_error("Piper server exited unexpectedly");
-                }
-            } else {
-                buffer.append(m_process->read(wordCount * wordSize));
-                size_t haveWords = buffer.size() / wordSize;
-                size_t expectedWords =
-                    capnp::expectedSizeInWordsFromPrefix(toKJArray(buffer));
-                if (haveWords >= expectedWords) {
-                    if (haveWords > expectedWords) {
-                        cerr << "WARNING: obtained more data than expected ("
-                             << haveWords << " words, expected " << expectedWords
-                             << ")" << endl;
-                    }
-                    complete = true;
-                }
-            }
-        }
-/*
-        cerr << "buffer = ";
-        for (int i = 0; i < buffer.size(); ++i) {
-            if (i % 16 == 0) cerr << "\n";
-            cerr << int(buffer[i]) << " ";
-        }
-        cerr << "\n";
-*/        
-        return buffer;
-    }
-
-    void
-    checkResponseType(const RpcResponse::Reader &r,
-                      RpcResponse::Response::Which type,
-                      ReqId id) {
-        
-        if (r.getResponse().which() != type) {
-            throw std::runtime_error("Wrong response type");
-        }
-        if (ReqId(r.getId().getNumber()) != id) {
-            throw std::runtime_error("Wrong response id");
-        }
-    }
-};
-    
-}
-
 int main(int, char **)
 {
-    piper::PiperClient client;
+    piper::CapnpMessageCompletenessChecker checker;
+    piper::PipedQProcessTransport transport("../bin/piper-vamp-server", &checker);
+    piper::PiperCapnpClient client(&transport);
+    
     Vamp::Plugin *plugin = client.load("vamp-example-plugins:zerocrossing", 16, 0);
     if (!plugin->initialise(1, 4, 4)) {
         cerr << "initialisation failed" << endl;