changeset 126:2004ec2b653e

Ensure we read right up to end of buffered data after server exits; adjust waiting schedule on Windows (where waitForReadyRead is far too wasteful of sleep time)
author Chris Cannam <c.cannam@qmul.ac.uk>
date Fri, 28 Oct 2016 14:31:58 +0100
parents ea06fae1567c
children 5b113c87b6e6 74a7c2a8d6b6
files vamp-client/CapnpRRClient.h vamp-client/ProcessQtTransport.h vamp-client/SynchronousTransport.h
diffstat 3 files changed, 68 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/vamp-client/CapnpRRClient.h	Fri Oct 28 11:08:17 2016 +0100
+++ b/vamp-client/CapnpRRClient.h	Fri Oct 28 14:31:58 2016 +0100
@@ -102,7 +102,7 @@
     listPluginData() override {
 
         if (!m_transport->isOK()) {
-            throw std::runtime_error("Piper server failed to start");
+            throw std::runtime_error("Piper server crashed or failed to start");
         }
 
         capnp::MallocMessageBuilder message;
@@ -111,7 +111,7 @@
         ReqId id = getId();
         builder.getId().setNumber(id);
 
-        auto karr = call(message);
+        auto karr = call(message, true);
 
         capnp::FlatArrayMessageReader responseMessage(karr);
         piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
@@ -127,7 +127,7 @@
     loadPlugin(const LoadRequest &req) override {
 
         if (!m_transport->isOK()) {
-            throw std::runtime_error("Piper server failed to start");
+            throw std::runtime_error("Piper server crashed or failed to start");
         }
 
         LoadResponse resp;
@@ -158,7 +158,7 @@
               PluginConfiguration config) override {
 
         if (!m_transport->isOK()) {
-            throw std::runtime_error("Piper server failed to start");
+            throw std::runtime_error("Piper server crashed or failed to start");
         }
 
         ConfigurationRequest request;
@@ -172,7 +172,7 @@
         ReqId id = getId();
         builder.getId().setNumber(id);
 
-        auto karr = call(message);
+        auto karr = call(message, true);
 
         capnp::FlatArrayMessageReader responseMessage(karr);
         piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
@@ -196,7 +196,7 @@
             Vamp::RealTime timestamp) override {
 
         if (!m_transport->isOK()) {
-            throw std::runtime_error("Piper server failed to start");
+            throw std::runtime_error("Piper server crashed or failed to start");
         }
 
         ProcessRequest request;
@@ -210,7 +210,7 @@
         ReqId id = getId();
         builder.getId().setNumber(id);
 
-        auto karr = call(message);
+        auto karr = call(message, false);
 
         capnp::FlatArrayMessageReader responseMessage(karr);
         piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
@@ -231,7 +231,7 @@
     finish(PluginStub *plugin) override {
 
         if (!m_transport->isOK()) {
-            throw std::runtime_error("Piper server failed to start");
+            throw std::runtime_error("Piper server crashed or failed to start");
         }
 
         FinishRequest request;
@@ -244,7 +244,7 @@
         ReqId id = getId();
         builder.getId().setNumber(id);
         
-        auto karr = call(message);
+        auto karr = call(message, true);
 
         capnp::FlatArrayMessageReader responseMessage(karr);
         piper::RpcResponse::Reader reader = responseMessage.getRoot<piper::RpcResponse>();
@@ -273,7 +273,7 @@
         // Reload the plugin on the server side, and configure it as requested
         
         if (!m_transport->isOK()) {
-            throw std::runtime_error("Piper server failed to start");
+            throw std::runtime_error("Piper server crashed or failed to start");
         }
 
         if (m_mapper.havePlugin(plugin)) {
@@ -334,10 +334,11 @@
     }
 
     kj::Array<capnp::word>
-    call(capnp::MallocMessageBuilder &message) {
+    call(capnp::MallocMessageBuilder &message, bool slow) {
         auto arr = capnp::messageToFlatArray(message);
         auto responseBuffer = m_transport->call(arr.asChars().begin(),
-                                                arr.asChars().size());
+                                                arr.asChars().size(),
+                                                slow);
         return toKJArray(responseBuffer);
     }
     
@@ -358,7 +359,7 @@
         ReqId id = getId();
         builder.getId().setNumber(id);
 
-        auto karr = call(message);
+        auto karr = call(message, false);
 
         //!!! ... --> will also need some way to kill this process
         //!!! (from another thread)
--- a/vamp-client/ProcessQtTransport.h	Fri Oct 28 11:08:17 2016 +0100
+++ b/vamp-client/ProcessQtTransport.h	Fri Oct 28 14:31:58 2016 +0100
@@ -64,7 +64,8 @@
 {
 public:
     ProcessQtTransport(std::string processName, std::string formatArg) :
-        m_completenessChecker(0) {
+        m_completenessChecker(0),
+        m_crashed(false) {
 
         m_process = new QProcess();
         m_process->setReadChannel(QProcess::StandardOutput);
@@ -110,29 +111,31 @@
 
     void
     setCompletenessChecker(MessageCompletenessChecker *checker) override {
-        //!!! ownership?
         m_completenessChecker = checker;
     }
     
     bool
     isOK() const override {
-        return m_process != nullptr;
+        return (m_process != nullptr) && !m_crashed;
     }
     
     std::vector<char>
-    call(const char *ptr, size_t size) override {
+    call(const char *ptr, size_t size, bool slow) override {
 
         QMutexLocker locker(&m_mutex);
         
         if (!m_completenessChecker) {
             throw std::logic_error("No completeness checker set on transport");
         }
+        if (!isOK()) {
+            throw std::logic_error("Transport is not OK");
+        }
         
 #ifdef DEBUG_TRANSPORT
         std::cerr << "writing " << size << " bytes to server" << std::endl;
 #endif
         m_process->write(ptr, size);
-        m_process->waitForBytesWritten(1000);
+        m_process->waitForBytesWritten();
         
         std::vector<char> buffer;
         bool complete = false;
@@ -145,9 +148,25 @@
 #ifdef DEBUG_TRANSPORT
                 std::cerr << "waiting for data from server..." << std::endl;
 #endif
-                m_process->waitForReadyRead(1000);
-                
-                if (m_process->state() == QProcess::NotRunning) {
+                if (slow) {
+                    m_process->waitForReadyRead(1000);
+                } else {
+#ifdef _WIN32
+                    // This is most unsatisfactory -- if we give a non-zero
+                    // arg here, then we end up sleeping way beyond the arrival
+                    // of the data to read -- can end up using less than 10%
+                    // CPU during processing which is crazy. So for Windows
+                    // only, we busy-wait during "fast" calls. It works out
+                    // much faster in the end. Could do with a simpler native
+                    // blocking API really.
+                    m_process->waitForReadyRead(0);
+#else
+                    m_process->waitForReadyRead(100);
+#endif
+                }
+                if (m_process->state() == QProcess::NotRunning &&
+                    // don't give up until we've read all that's been buffered!
+                    !m_process->bytesAvailable()) {
                     QProcess::ProcessError err = m_process->error();
                     if (err == QProcess::Crashed) {
                         std::cerr << "Server crashed during request" << std::endl;
@@ -155,6 +174,7 @@
                         std::cerr << "Server failed during request with error code "
                                   << err << std::endl;
                     }
+                    m_crashed = true;
                     throw ServerCrashed();
                 }
             } else {
@@ -172,6 +192,7 @@
     MessageCompletenessChecker *m_completenessChecker; //!!! I don't own this (currently)
     QProcess *m_process; // I own this
     QMutex m_mutex;
+    bool m_crashed;
 };
 
 }
--- a/vamp-client/SynchronousTransport.h	Fri Oct 28 11:08:17 2016 +0100
+++ b/vamp-client/SynchronousTransport.h	Fri Oct 28 14:31:58 2016 +0100
@@ -62,14 +62,33 @@
 public:
     virtual ~SynchronousTransport() = default;
     
-    //!!! I do not take ownership
+    /**
+     * Set a completeness checker object. The caller retains ownership
+     * of the checker and must ensure its lifespan outlives the transport.
+     */
     virtual void setCompletenessChecker(MessageCompletenessChecker *) = 0;
-    
-    //!!! how to handle errors -- exception or return value? often an
-    //!!! error (e.g. server has exited) may mean the transport can no
-    //!!! longer be used at all
-    virtual std::vector<char> call(const char *data, size_t bytes) = 0;
 
+    /**
+     * Make a synchronous call, passing a serialised request in the data array
+     * of length bytes, and return the result.
+     *
+     * The slow flag is a hint that the recipient may take longer than usual
+     * to process this request and so the caller may wish to be more relaxed
+     * about idling to wait for the reply. (This shouldn't make any difference
+     * with a sensible blocking network API, but you never know...)
+     *
+     * May throw ServerCrashed if the server endpoint disappeared during the
+     * call. Throws std::logic_error if isOK() is not true at the time of
+     * calling, so check that before you call.
+     */
+    virtual std::vector<char> call(const char *data, size_t bytes, bool slow) = 0;
+
+    /**
+     * Check whether the transport was initialised correctly and is working.
+     * This will return false if the endpoint could not be initialised or
+     * the endpoint service has crashed or become unavailable. Always check
+     * this before using call().
+     */
     virtual bool isOK() const = 0;
 };