Mercurial > hg > piper-cpp
view utilities/vampipe-server.cpp @ 33:0b48b10140bb
Switch to non-packed protocol and handle multiple messages and EOF properly; fill in remaining server actions
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Wed, 25 May 2016 10:43:07 +0100 |
parents | 2d97883d20df |
children | ba58fe5ee2dd |
line wrap: on
line source
#include "VampnProto.h" #include "bits/RequestOrResponse.h" #include <iostream> #include <sstream> #include <stdexcept> #include <map> #include <set> using namespace std; using namespace vampipe; using namespace Vamp; using namespace Vamp::HostExt; void usage() { string myname = "vampipe-server"; cerr << "\n" << myname << ": Load and run Vamp plugins in response to messages from stdin\n\n" " Usage: " << myname << "\n\n" "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n" "and writes Vamp response messages in the same format to stdout.\n\n"; exit(2); } class Mapper : public PluginHandleMapper { public: Mapper() : m_nextHandle(1) { } void addPlugin(Plugin *p) { if (m_rplugins.find(p) == m_rplugins.end()) { int32_t h = m_nextHandle++; m_plugins[h] = p; m_rplugins[p] = h; } } void removePlugin(int32_t h) { if (m_plugins.find(h) == m_plugins.end()) { throw NotFound(); } Plugin *p = m_plugins[h]; m_plugins.erase(h); m_rplugins.erase(p); } int32_t pluginToHandle(Plugin *p) { if (m_rplugins.find(p) == m_rplugins.end()) { throw NotFound(); } return m_rplugins[p]; } Plugin *handleToPlugin(int32_t h) { if (m_plugins.find(h) == m_plugins.end()) { throw NotFound(); } return m_plugins[h]; } bool isConfigured(int32_t h) { return m_configuredPlugins.find(h) != m_configuredPlugins.end(); } void markConfigured(int32_t h) { m_configuredPlugins.insert(h); } private: int32_t m_nextHandle; // NB plugin handle type must fit in JSON number map<uint32_t, Plugin *> m_plugins; map<Plugin *, uint32_t> m_rplugins; set<uint32_t> m_configuredPlugins; }; static Mapper mapper; RequestOrResponse readRequestCapnp() { RequestOrResponse rr; rr.direction = RequestOrResponse::Request; static kj::FdInputStream stream(0); // stdin static kj::BufferedInputStreamWrapper buffered(stream); if (buffered.tryGetReadBuffer() == nullptr) { rr.type = RRType::NotValid; return rr; } ::capnp::InputStreamMessageReader message(buffered); VampRequest::Reader reader = message.getRoot<VampRequest>(); rr.type = VampnProto::getRequestResponseType(reader); switch (rr.type) { case RRType::List: VampnProto::readVampRequest_List(reader); // type check only break; case RRType::Load: VampnProto::readVampRequest_Load(rr.loadRequest, reader); break; case RRType::Configure: VampnProto::readVampRequest_Configure(rr.configurationRequest, reader, mapper); break; case RRType::Process: VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper); break; case RRType::Finish: VampnProto::readVampRequest_Finish(rr.finishPlugin, reader, mapper); break; case RRType::NotValid: break; } return rr; } void writeResponseCapnp(RequestOrResponse &rr) { ::capnp::MallocMessageBuilder message; VampResponse::Builder builder = message.initRoot<VampResponse>(); switch (rr.type) { case RRType::List: VampnProto::buildVampResponse_List(builder, "", rr.listResponse); break; case RRType::Load: VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper); break; case RRType::Configure: VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse); break; case RRType::Process: VampnProto::buildVampResponse_Process(builder, rr.processResponse); break; case RRType::Finish: VampnProto::buildVampResponse_Finish(builder, rr.finishResponse); break; case RRType::NotValid: break; } writeMessageToFd(1, message); } RequestOrResponse processRequest(const RequestOrResponse &request) { RequestOrResponse response; response.direction = RequestOrResponse::Response; response.type = request.type; auto loader = PluginLoader::getInstance(); switch (request.type) { case RRType::List: response.listResponse = loader->listPluginData(); response.success = true; break; case RRType::Load: response.loadResponse = loader->loadPlugin(request.loadRequest); if (response.loadResponse.plugin != nullptr) { mapper.addPlugin(response.loadResponse.plugin); response.success = true; } break; case RRType::Configure: { auto h = mapper.pluginToHandle(request.configurationRequest.plugin); if (mapper.isConfigured(h)) { throw runtime_error("plugin has already been configured"); } response.configurationResponse = loader->configurePlugin(request.configurationRequest); if (!response.configurationResponse.outputs.empty()) { mapper.markConfigured(h); response.success = true; } break; } case RRType::Process: { auto &preq = request.processRequest; int channels = int(preq.inputBuffers.size()); const float **fbuffers = new const float *[channels]; for (int i = 0; i < channels; ++i) { fbuffers[i] = preq.inputBuffers[i].data(); } response.processResponse.features = preq.plugin->process(fbuffers, preq.timestamp); response.success = true; delete[] fbuffers; break; } case RRType::Finish: { auto h = mapper.pluginToHandle(request.finishPlugin); response.finishResponse.features = request.finishPlugin->getRemainingFeatures(); mapper.removePlugin(h); delete request.finishPlugin; response.success = true; break; } case RRType::NotValid: break; } return response; } int main(int argc, char **argv) { if (argc != 1) { usage(); } while (true) { try { RequestOrResponse request = readRequestCapnp(); cerr << "vampipe-server: request received, of type " << int(request.type) << endl; // NotValid without an exception indicates EOF: if (request.type == RRType::NotValid) { cerr << "vampipe-server: eof reached" << endl; break; } RequestOrResponse response = processRequest(request); cerr << "vampipe-server: request processed, writing response" << endl; writeResponseCapnp(response); cerr << "vampipe-server: response written" << endl; } catch (std::exception &e) { cerr << "vampipe-server: error: " << e.what() << endl; exit(1); } } exit(0); }