Mercurial > hg > piper-cpp
changeset 126:2004ec2b653e
Ensure we read right up to end of buffered data after server exits; adjust waiting schedule on Windows (where waitForReadyRead is far too wasteful of sleep time)
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Fri, 28 Oct 2016 14:31:58 +0100 |
parents | ea06fae1567c |
children | 5b113c87b6e6 74a7c2a8d6b6 |
files | vamp-client/CapnpRRClient.h vamp-client/ProcessQtTransport.h vamp-client/SynchronousTransport.h |
diffstat | 3 files changed, 68 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/vamp-client/CapnpRRClient.h Fri Oct 28 11:08:17 2016 +0100 +++ b/vamp-client/CapnpRRClient.h Fri Oct 28 14:31:58 2016 +0100 @@ -102,7 +102,7 @@ listPluginData() override { if (!m_transport->isOK()) { - throw std::runtime_error("Piper server failed to start"); + throw std::runtime_error("Piper server crashed or failed to start"); } capnp::MallocMessageBuilder message; @@ -111,7 +111,7 @@ ReqId id = getId(); builder.getId().setNumber(id); - auto karr = call(message); + auto karr = call(message, true); capnp::FlatArrayMessageReader responseMessage(karr); piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>(); @@ -127,7 +127,7 @@ loadPlugin(const LoadRequest &req) override { if (!m_transport->isOK()) { - throw std::runtime_error("Piper server failed to start"); + throw std::runtime_error("Piper server crashed or failed to start"); } LoadResponse resp; @@ -158,7 +158,7 @@ PluginConfiguration config) override { if (!m_transport->isOK()) { - throw std::runtime_error("Piper server failed to start"); + throw std::runtime_error("Piper server crashed or failed to start"); } ConfigurationRequest request; @@ -172,7 +172,7 @@ ReqId id = getId(); builder.getId().setNumber(id); - auto karr = call(message); + auto karr = call(message, true); capnp::FlatArrayMessageReader responseMessage(karr); piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>(); @@ -196,7 +196,7 @@ Vamp::RealTime timestamp) override { if (!m_transport->isOK()) { - throw std::runtime_error("Piper server failed to start"); + throw std::runtime_error("Piper server crashed or failed to start"); } ProcessRequest request; @@ -210,7 +210,7 @@ ReqId id = getId(); builder.getId().setNumber(id); - auto karr = call(message); + auto karr = call(message, false); capnp::FlatArrayMessageReader responseMessage(karr); piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>(); @@ -231,7 +231,7 @@ finish(PluginStub *plugin) override { if (!m_transport->isOK()) { - throw std::runtime_error("Piper server failed to start"); + throw std::runtime_error("Piper server crashed or failed to start"); } FinishRequest request; @@ -244,7 +244,7 @@ ReqId id = getId(); builder.getId().setNumber(id); - auto karr = call(message); + auto karr = call(message, true); capnp::FlatArrayMessageReader responseMessage(karr); piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>(); @@ -273,7 +273,7 @@ // Reload the plugin on the server side, and configure it as requested if (!m_transport->isOK()) { - throw std::runtime_error("Piper server failed to start"); + throw std::runtime_error("Piper server crashed or failed to start"); } if (m_mapper.havePlugin(plugin)) { @@ -334,10 +334,11 @@ } kj::Array<capnp::word> - call(capnp::MallocMessageBuilder &message) { + call(capnp::MallocMessageBuilder &message, bool slow) { auto arr = capnp::messageToFlatArray(message); auto responseBuffer = m_transport->call(arr.asChars().begin(), - arr.asChars().size()); + arr.asChars().size(), + slow); return toKJArray(responseBuffer); } @@ -358,7 +359,7 @@ ReqId id = getId(); builder.getId().setNumber(id); - auto karr = call(message); + auto karr = call(message, false); //!!! ... --> will also need some way to kill this process //!!! (from another thread)
--- a/vamp-client/ProcessQtTransport.h Fri Oct 28 11:08:17 2016 +0100 +++ b/vamp-client/ProcessQtTransport.h Fri Oct 28 14:31:58 2016 +0100 @@ -64,7 +64,8 @@ { public: ProcessQtTransport(std::string processName, std::string formatArg) : - m_completenessChecker(0) { + m_completenessChecker(0), + m_crashed(false) { m_process = new QProcess(); m_process->setReadChannel(QProcess::StandardOutput); @@ -110,29 +111,31 @@ void setCompletenessChecker(MessageCompletenessChecker *checker) override { - //!!! ownership? m_completenessChecker = checker; } bool isOK() const override { - return m_process != nullptr; + return (m_process != nullptr) && !m_crashed; } std::vector<char> - call(const char *ptr, size_t size) override { + call(const char *ptr, size_t size, bool slow) override { QMutexLocker locker(&m_mutex); if (!m_completenessChecker) { throw std::logic_error("No completeness checker set on transport"); } + if (!isOK()) { + throw std::logic_error("Transport is not OK"); + } #ifdef DEBUG_TRANSPORT std::cerr << "writing " << size << " bytes to server" << std::endl; #endif m_process->write(ptr, size); - m_process->waitForBytesWritten(1000); + m_process->waitForBytesWritten(); std::vector<char> buffer; bool complete = false; @@ -145,9 +148,25 @@ #ifdef DEBUG_TRANSPORT std::cerr << "waiting for data from server..." << std::endl; #endif - m_process->waitForReadyRead(1000); - - if (m_process->state() == QProcess::NotRunning) { + if (slow) { + m_process->waitForReadyRead(1000); + } else { +#ifdef _WIN32 + // This is most unsatisfactory -- if we give a non-zero + // arg here, then we end up sleeping way beyond the arrival + // of the data to read -- can end up using less than 10% + // CPU during processing which is crazy. So for Windows + // only, we busy-wait during "fast" calls. It works out + // much faster in the end. Could do with a simpler native + // blocking API really. + m_process->waitForReadyRead(0); +#else + m_process->waitForReadyRead(100); +#endif + } + if (m_process->state() == QProcess::NotRunning && + // don't give up until we've read all that's been buffered! + !m_process->bytesAvailable()) { QProcess::ProcessError err = m_process->error(); if (err == QProcess::Crashed) { std::cerr << "Server crashed during request" << std::endl; @@ -155,6 +174,7 @@ std::cerr << "Server failed during request with error code " << err << std::endl; } + m_crashed = true; throw ServerCrashed(); } } else { @@ -172,6 +192,7 @@ MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently) QProcess *m_process; // I own this QMutex m_mutex; + bool m_crashed; }; }
--- a/vamp-client/SynchronousTransport.h Fri Oct 28 11:08:17 2016 +0100 +++ b/vamp-client/SynchronousTransport.h Fri Oct 28 14:31:58 2016 +0100 @@ -62,14 +62,33 @@ public: virtual ~SynchronousTransport() = default; - //!!! I do not take ownership + /** + * Set a completeness checker object. The caller retains ownership + * of the checker and must ensure its lifespan outlives the transport. + */ virtual void setCompletenessChecker(MessageCompletenessChecker *) = 0; - - //!!! how to handle errors -- exception or return value? often an - //!!! error (e.g. server has exited) may mean the transport can no - //!!! longer be used at all - virtual std::vector<char> call(const char *data, size_t bytes) = 0; + /** + * Make a synchronous call, passing a serialised request in the data array + * of length bytes, and return the result. + * + * The slow flag is a hint that the recipient may take longer than usual + * to process this request and so the caller may wish to be more relaxed + * about idling to wait for the reply. (This shouldn't make any difference + * with a sensible blocking network API, but you never know...) + * + * May throw ServerCrashed if the server endpoint disappeared during the + * call. Throws std::logic_error if isOK() is not true at the time of + * calling, so check that before you call. + */ + virtual std::vector<char> call(const char *data, size_t bytes, bool slow) = 0; + + /** + * Check whether the transport was initialised correctly and is working. + * This will return false if the endpoint could not be initialised or + * the endpoint service has crashed or become unavailable. Always check + * this before using call(). + */ virtual bool isOK() const = 0; };