Mercurial > hg > piper-cpp
changeset 33:0b48b10140bb
Switch to non-packed protocol and handle multiple messages and EOF properly; fill in remaining server actions
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Wed, 25 May 2016 10:43:07 +0100 |
parents | 2d97883d20df |
children | ba58fe5ee2dd |
files | utilities/vampipe-convert.cpp utilities/vampipe-server.cpp |
diffstat | 2 files changed, 122 insertions(+), 31 deletions(-) [+] |
line wrap: on
line diff
--- a/utilities/vampipe-convert.cpp Tue May 24 17:17:03 2016 +0100 +++ b/utilities/vampipe-convert.cpp Wed May 25 10:43:07 2016 +0100 @@ -217,12 +217,12 @@ } RequestOrResponse -readRequestCapnp() +readRequestCapnp(kj::BufferedInputStreamWrapper &buffered) { RequestOrResponse rr; rr.direction = RequestOrResponse::Request; - ::capnp::PackedFdMessageReader message(0); // stdin + ::capnp::InputStreamMessageReader message(buffered); VampRequest::Reader reader = message.getRoot<VampRequest>(); rr.type = VampnProto::getRequestResponseType(reader); @@ -280,16 +280,16 @@ break; } - writePackedMessageToFd(1, message); + writeMessageToFd(1, message); } RequestOrResponse -readResponseCapnp() +readResponseCapnp(kj::BufferedInputStreamWrapper &buffered) { RequestOrResponse rr; rr.direction = RequestOrResponse::Response; - ::capnp::PackedFdMessageReader message(0); // stdin + ::capnp::InputStreamMessageReader message(buffered); VampResponse::Reader reader = message.getRoot<VampResponse>(); rr.type = VampnProto::getRequestResponseType(reader); @@ -346,24 +346,43 @@ break; } - writePackedMessageToFd(1, message); + writeMessageToFd(1, message); +} + +RequestOrResponse +readInputJson(RequestOrResponse::Direction direction) +{ + if (direction == RequestOrResponse::Request) { + return readRequestJson(); + } else { + return readResponseJson(); + } +} + +RequestOrResponse +readInputCapnp(RequestOrResponse::Direction direction) +{ + static kj::FdInputStream stream(0); // stdin + static kj::BufferedInputStreamWrapper buffered(stream); + + if (buffered.tryGetReadBuffer() == nullptr) { + return {}; + } + + if (direction == RequestOrResponse::Request) { + return readRequestCapnp(buffered); + } else { + return readResponseCapnp(buffered); + } } RequestOrResponse readInput(string format, RequestOrResponse::Direction direction) { if (format == "json") { - if (direction == RequestOrResponse::Request) { - return readRequestJson(); - } else { - return readResponseJson(); - } + return readInputJson(direction); } else if (format == "capnp") { - if (direction == RequestOrResponse::Request) { - return readRequestCapnp(); - } else { - return readResponseCapnp(); - } + return readInputCapnp(direction); } else { throw runtime_error("unknown input format \"" + format + "\""); }
--- a/utilities/vampipe-server.cpp Tue May 24 17:17:03 2016 +0100 +++ b/utilities/vampipe-server.cpp Wed May 25 10:43:07 2016 +0100 @@ -39,6 +39,15 @@ m_rplugins[p] = h; } } + + void removePlugin(int32_t h) { + if (m_plugins.find(h) == m_plugins.end()) { + throw NotFound(); + } + Plugin *p = m_plugins[h]; + m_plugins.erase(h); + m_rplugins.erase(p); + } int32_t pluginToHandle(Plugin *p) { if (m_rplugins.find(p) == m_rplugins.end()) { @@ -54,19 +63,19 @@ return m_plugins[h]; } - bool isInitialised(int32_t h) { - return m_initialisedPlugins.find(h) != m_initialisedPlugins.end(); + bool isConfigured(int32_t h) { + return m_configuredPlugins.find(h) != m_configuredPlugins.end(); } - void markInitialised(int32_t h) { - m_initialisedPlugins.insert(h); + void markConfigured(int32_t h) { + m_configuredPlugins.insert(h); } private: int32_t m_nextHandle; // NB plugin handle type must fit in JSON number map<uint32_t, Plugin *> m_plugins; map<Plugin *, uint32_t> m_rplugins; - set<uint32_t> m_initialisedPlugins; + set<uint32_t> m_configuredPlugins; }; static Mapper mapper; @@ -77,7 +86,15 @@ RequestOrResponse rr; rr.direction = RequestOrResponse::Request; - ::capnp::PackedFdMessageReader message(0); // stdin + static kj::FdInputStream stream(0); // stdin + static kj::BufferedInputStreamWrapper buffered(stream); + + if (buffered.tryGetReadBuffer() == nullptr) { + rr.type = RRType::NotValid; + return rr; + } + + ::capnp::InputStreamMessageReader message(buffered); VampRequest::Reader reader = message.getRoot<VampRequest>(); rr.type = VampnProto::getRequestResponseType(reader); @@ -134,7 +151,7 @@ break; } - writePackedMessageToFd(1, message); + writeMessageToFd(1, message); } RequestOrResponse @@ -161,9 +178,55 @@ } break; - default: - //!!! - ; + case RRType::Configure: + { + auto h = mapper.pluginToHandle(request.configurationRequest.plugin); + if (mapper.isConfigured(h)) { + throw runtime_error("plugin has already been configured"); + } + + response.configurationResponse = + loader->configurePlugin(request.configurationRequest); + + if (!response.configurationResponse.outputs.empty()) { + mapper.markConfigured(h); + response.success = true; + } + break; + } + + case RRType::Process: + { + auto &preq = request.processRequest; + int channels = int(preq.inputBuffers.size()); + const float **fbuffers = new const float *[channels]; + for (int i = 0; i < channels; ++i) { + fbuffers[i] = preq.inputBuffers[i].data(); + } + + response.processResponse.features = + preq.plugin->process(fbuffers, preq.timestamp); + response.success = true; + + delete[] fbuffers; + break; + } + + case RRType::Finish: + { + auto h = mapper.pluginToHandle(request.finishPlugin); + + response.finishResponse.features = + request.finishPlugin->getRemainingFeatures(); + + mapper.removePlugin(h); + delete request.finishPlugin; + response.success = true; + break; + } + + case RRType::NotValid: + break; } return response; @@ -181,18 +244,27 @@ RequestOrResponse request = readRequestCapnp(); + cerr << "vampipe-server: request received, of type " + << int(request.type) + << endl; + // NotValid without an exception indicates EOF: - - //!!! not yet it doesn't -- have to figure out how to - //!!! handle this with capnp - if (request.type == RRType::NotValid) break; + if (request.type == RRType::NotValid) { + cerr << "vampipe-server: eof reached" << endl; + break; + } RequestOrResponse response = processRequest(request); + + cerr << "vampipe-server: request processed, writing response" + << endl; writeResponseCapnp(response); + + cerr << "vampipe-server: response written" << endl; } catch (std::exception &e) { - cerr << "Error: " << e.what() << endl; + cerr << "vampipe-server: error: " << e.what() << endl; exit(1); } }