changeset 33:0b48b10140bb

Switch to non-packed protocol and handle multiple messages and EOF properly; fill in remaining server actions
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 25 May 2016 10:43:07 +0100
parents 2d97883d20df
children ba58fe5ee2dd
files utilities/vampipe-convert.cpp utilities/vampipe-server.cpp
diffstat 2 files changed, 122 insertions(+), 31 deletions(-) [+]
line wrap: on
line diff
--- a/utilities/vampipe-convert.cpp	Tue May 24 17:17:03 2016 +0100
+++ b/utilities/vampipe-convert.cpp	Wed May 25 10:43:07 2016 +0100
@@ -217,12 +217,12 @@
 }
 
 RequestOrResponse
-readRequestCapnp()
+readRequestCapnp(kj::BufferedInputStreamWrapper &buffered)
 {
     RequestOrResponse rr;
     rr.direction = RequestOrResponse::Request;
 
-    ::capnp::PackedFdMessageReader message(0); // stdin
+    ::capnp::InputStreamMessageReader message(buffered);
     VampRequest::Reader reader = message.getRoot<VampRequest>();
     
     rr.type = VampnProto::getRequestResponseType(reader);
@@ -280,16 +280,16 @@
 	break;
     }
 
-    writePackedMessageToFd(1, message);
+    writeMessageToFd(1, message);
 }
 
 RequestOrResponse
-readResponseCapnp()
+readResponseCapnp(kj::BufferedInputStreamWrapper &buffered)
 {
     RequestOrResponse rr;
     rr.direction = RequestOrResponse::Response;
 
-    ::capnp::PackedFdMessageReader message(0); // stdin
+    ::capnp::InputStreamMessageReader message(buffered);
     VampResponse::Reader reader = message.getRoot<VampResponse>();
     
     rr.type = VampnProto::getRequestResponseType(reader);
@@ -346,24 +346,43 @@
 	break;
     }
 
-    writePackedMessageToFd(1, message);
+    writeMessageToFd(1, message);
+}
+
+RequestOrResponse
+readInputJson(RequestOrResponse::Direction direction)
+{
+    if (direction == RequestOrResponse::Request) {
+	return readRequestJson();
+    } else {
+	return readResponseJson();
+    }
+}
+
+RequestOrResponse
+readInputCapnp(RequestOrResponse::Direction direction)
+{
+    static kj::FdInputStream stream(0); // stdin
+    static kj::BufferedInputStreamWrapper buffered(stream);
+
+    if (buffered.tryGetReadBuffer() == nullptr) {
+	return {};
+    }
+    
+    if (direction == RequestOrResponse::Request) {
+	return readRequestCapnp(buffered);
+    } else {
+	return readResponseCapnp(buffered);
+    }
 }
 
 RequestOrResponse
 readInput(string format, RequestOrResponse::Direction direction)
 {
     if (format == "json") {
-	if (direction == RequestOrResponse::Request) {
-	    return readRequestJson();
-	} else {
-	    return readResponseJson();
-	}
+	return readInputJson(direction);
     } else if (format == "capnp") {
-	if (direction == RequestOrResponse::Request) {
-	    return readRequestCapnp();
-	} else {
-	    return readResponseCapnp();
-	}
+	return readInputCapnp(direction);
     } else {
 	throw runtime_error("unknown input format \"" + format + "\"");
     }
--- a/utilities/vampipe-server.cpp	Tue May 24 17:17:03 2016 +0100
+++ b/utilities/vampipe-server.cpp	Wed May 25 10:43:07 2016 +0100
@@ -39,6 +39,15 @@
 	    m_rplugins[p] = h;
 	}
     }
+
+    void removePlugin(int32_t h) {
+	if (m_plugins.find(h) == m_plugins.end()) {
+	    throw NotFound();
+	}
+	Plugin *p = m_plugins[h];
+	m_plugins.erase(h);
+	m_rplugins.erase(p);
+    }
     
     int32_t pluginToHandle(Plugin *p) {
 	if (m_rplugins.find(p) == m_rplugins.end()) {
@@ -54,19 +63,19 @@
 	return m_plugins[h];
     }
 
-    bool isInitialised(int32_t h) {
-	return m_initialisedPlugins.find(h) != m_initialisedPlugins.end();
+    bool isConfigured(int32_t h) {
+	return m_configuredPlugins.find(h) != m_configuredPlugins.end();
     }
 
-    void markInitialised(int32_t h) {
-	m_initialisedPlugins.insert(h);
+    void markConfigured(int32_t h) {
+	m_configuredPlugins.insert(h);
     }
     
 private:
     int32_t m_nextHandle; // NB plugin handle type must fit in JSON number
     map<uint32_t, Plugin *> m_plugins;
     map<Plugin *, uint32_t> m_rplugins;
-    set<uint32_t> m_initialisedPlugins;
+    set<uint32_t> m_configuredPlugins;
 };
 
 static Mapper mapper;
@@ -77,7 +86,15 @@
     RequestOrResponse rr;
     rr.direction = RequestOrResponse::Request;
 
-    ::capnp::PackedFdMessageReader message(0); // stdin
+    static kj::FdInputStream stream(0); // stdin
+    static kj::BufferedInputStreamWrapper buffered(stream);
+
+    if (buffered.tryGetReadBuffer() == nullptr) {
+	rr.type = RRType::NotValid;
+	return rr;
+    }
+
+    ::capnp::InputStreamMessageReader message(buffered);
     VampRequest::Reader reader = message.getRoot<VampRequest>();
     
     rr.type = VampnProto::getRequestResponseType(reader);
@@ -134,7 +151,7 @@
 	break;
     }
 
-    writePackedMessageToFd(1, message);
+    writeMessageToFd(1, message);
 }
 
 RequestOrResponse
@@ -161,9 +178,55 @@
 	}
 	break;
 	
-    default:
-	//!!!
-	;
+    case RRType::Configure:
+    {
+	auto h = mapper.pluginToHandle(request.configurationRequest.plugin);
+	if (mapper.isConfigured(h)) {
+	    throw runtime_error("plugin has already been configured");
+	}
+
+	response.configurationResponse =
+	    loader->configurePlugin(request.configurationRequest);
+	
+	if (!response.configurationResponse.outputs.empty()) {
+	    mapper.markConfigured(h);
+	    response.success = true;
+	}
+	break;
+    }
+
+    case RRType::Process:
+    {
+	auto &preq = request.processRequest;
+	int channels = int(preq.inputBuffers.size());
+	const float **fbuffers = new const float *[channels];
+	for (int i = 0; i < channels; ++i) {
+	    fbuffers[i] = preq.inputBuffers[i].data();
+	}
+
+	response.processResponse.features =
+	    preq.plugin->process(fbuffers, preq.timestamp);
+	response.success = true;
+
+	delete[] fbuffers;
+	break;
+    }
+
+    case RRType::Finish:
+    {
+	auto h = mapper.pluginToHandle(request.finishPlugin);
+
+	response.finishResponse.features =
+	    request.finishPlugin->getRemainingFeatures();
+	    
+	mapper.removePlugin(h);
+	delete request.finishPlugin;
+	response.success = true;
+	break;
+    }
+
+    case RRType::NotValid:
+	break;
     }
     
     return response;
@@ -181,18 +244,27 @@
 
 	    RequestOrResponse request = readRequestCapnp();
 
+	    cerr << "vampipe-server: request received, of type "
+		 << int(request.type)
+		 << endl;
+	    
 	    // NotValid without an exception indicates EOF:
-
-	    //!!! not yet it doesn't -- have to figure out how to
-	    //!!! handle this with capnp
-	    if (request.type == RRType::NotValid) break;
+	    if (request.type == RRType::NotValid) {
+		cerr << "vampipe-server: eof reached" << endl;
+		break;
+	    }
 
 	    RequestOrResponse response = processRequest(request);
+
+	    cerr << "vampipe-server: request processed, writing response"
+		 << endl;
 	    
 	    writeResponseCapnp(response);
+
+	    cerr << "vampipe-server: response written" << endl;
 	    
 	} catch (std::exception &e) {
-	    cerr << "Error: " << e.what() << endl;
+	    cerr << "vampipe-server: error: " << e.what() << endl;
 	    exit(1);
 	}
     }