Mercurial > hg > piper-cpp
view utilities/vampipe-server.cpp @ 66:6f160dee1192
Instead of using separate values and b64values entries in JSON serialisations, allow numeric arrays to be replaced by b64 variants wherever they appear (discriminating by type). Also rename values to featureValues in feature throughout, as values turns out to be a hazardous name in a JS context. Finally use Array instead of Text for array encoding (seems clearer).
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Tue, 27 Sep 2016 15:04:59 +0100 |
parents | 815e94fedc1c |
children | a5ba837bca28 |
line wrap: on
line source
#include "VampnProto.h" #include "bits/RequestOrResponse.h" #include "bits/CountingPluginHandleMapper.h" #include <iostream> #include <sstream> #include <stdexcept> #include <map> #include <set> using namespace std; using namespace vampipe; using namespace Vamp; using namespace Vamp::HostExt; void usage() { string myname = "vampipe-server"; cerr << "\n" << myname << ": Load and run Vamp plugins in response to messages from stdin\n\n" " Usage: " << myname << "\n\n" "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n" "and writes Vamp response messages in the same format to stdout.\n\n"; exit(2); } static CountingPluginHandleMapper mapper; RequestOrResponse readRequestCapnp() { RequestOrResponse rr; rr.direction = RequestOrResponse::Request; static kj::FdInputStream stream(0); // stdin static kj::BufferedInputStreamWrapper buffered(stream); if (buffered.tryGetReadBuffer() == nullptr) { rr.type = RRType::NotValid; return rr; } ::capnp::InputStreamMessageReader message(buffered); VampRequest::Reader reader = message.getRoot<VampRequest>(); rr.type = VampnProto::getRequestResponseType(reader); switch (rr.type) { case RRType::List: VampnProto::readVampRequest_List(reader); // type check only break; case RRType::Load: VampnProto::readVampRequest_Load(rr.loadRequest, reader); break; case RRType::Configure: VampnProto::readVampRequest_Configure(rr.configurationRequest, reader, mapper); break; case RRType::Process: VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper); break; case RRType::Finish: VampnProto::readVampRequest_Finish(rr.finishRequest, reader, mapper); break; case RRType::NotValid: break; } return rr; } void writeResponseCapnp(RequestOrResponse &rr) { ::capnp::MallocMessageBuilder message; VampResponse::Builder builder = message.initRoot<VampResponse>(); if (!rr.success) { VampnProto::buildVampResponse_Error(builder, rr.errorText, rr.type); } else { switch (rr.type) { case RRType::List: VampnProto::buildVampResponse_List(builder, rr.listResponse); break; case RRType::Load: VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper); break; case RRType::Configure: VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse, mapper); break; case RRType::Process: VampnProto::buildVampResponse_Process(builder, rr.processResponse, mapper); break; case RRType::Finish: VampnProto::buildVampResponse_Finish(builder, rr.finishResponse, mapper); break; case RRType::NotValid: break; } } writeMessageToFd(1, message); } void writeExceptionCapnp(const std::exception &e, RRType type) { ::capnp::MallocMessageBuilder message; VampResponse::Builder builder = message.initRoot<VampResponse>(); VampnProto::buildVampResponse_Exception(builder, e, type); writeMessageToFd(1, message); } RequestOrResponse handleRequest(const RequestOrResponse &request) { RequestOrResponse response; response.direction = RequestOrResponse::Response; response.type = request.type; auto loader = PluginLoader::getInstance(); switch (request.type) { case RRType::List: response.listResponse = loader->listPluginData(); response.success = true; break; case RRType::Load: response.loadResponse = loader->loadPlugin(request.loadRequest); if (response.loadResponse.plugin != nullptr) { mapper.addPlugin(response.loadResponse.plugin); response.success = true; } break; case RRType::Configure: { auto &creq = request.configurationRequest; auto h = mapper.pluginToHandle(creq.plugin); if (mapper.isConfigured(h)) { throw runtime_error("plugin has already been configured"); } response.configurationResponse = loader->configurePlugin(creq); if (!response.configurationResponse.outputs.empty()) { mapper.markConfigured (h, creq.configuration.channelCount, creq.configuration.blockSize); response.success = true; } break; } case RRType::Process: { auto &preq = request.processRequest; auto h = mapper.pluginToHandle(preq.plugin); if (!mapper.isConfigured(h)) { throw runtime_error("plugin has not been configured"); } int channels = int(preq.inputBuffers.size()); if (channels != mapper.getChannelCount(h)) { throw runtime_error("wrong number of channels supplied to process"); } const float **fbuffers = new const float *[channels]; for (int i = 0; i < channels; ++i) { if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { delete[] fbuffers; throw runtime_error("wrong block size supplied to process"); } fbuffers[i] = preq.inputBuffers[i].data(); } response.processResponse.plugin = preq.plugin; response.processResponse.features = preq.plugin->process(fbuffers, preq.timestamp); response.success = true; delete[] fbuffers; break; } case RRType::Finish: { response.finishResponse.plugin = request.finishRequest.plugin; response.finishResponse.features = request.finishRequest.plugin->getRemainingFeatures(); // We do not delete the plugin here -- we need it in the // mapper when converting the features. It gets deleted by the // caller. response.success = true; break; } case RRType::NotValid: break; } return response; } int main(int argc, char **argv) { if (argc != 1) { usage(); } while (true) { RequestOrResponse request; try { request = readRequestCapnp(); cerr << "vampipe-server: request received, of type " << int(request.type) << endl; // NotValid without an exception indicates EOF: if (request.type == RRType::NotValid) { cerr << "vampipe-server: eof reached" << endl; break; } RequestOrResponse response = handleRequest(request); cerr << "vampipe-server: request handled, writing response" << endl; writeResponseCapnp(response); cerr << "vampipe-server: response written" << endl; if (request.type == RRType::Finish) { auto h = mapper.pluginToHandle(request.finishRequest.plugin); mapper.removePlugin(h); delete request.finishRequest.plugin; } } catch (std::exception &e) { cerr << "vampipe-server: error: " << e.what() << endl; writeExceptionCapnp(e, request.type); exit(1); } } exit(0); }