Mercurial > hg > piper-cpp
changeset 125:ea06fae1567c
Rename server to simple-server, and add some more description in usage
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Fri, 28 Oct 2016 11:08:17 +0100 |
parents | 4ff643c1eccc |
children | 2004ec2b653e |
files | Makefile test/test-server.sh vamp-client/ProcessQtTransport.h vamp-server/server.cpp vamp-server/simple-server.cpp |
diffstat | 5 files changed, 664 insertions(+), 644 deletions(-) [+] |
line wrap: on
line diff
--- a/Makefile Thu Oct 27 16:14:22 2016 +0100 +++ b/Makefile Fri Oct 28 11:08:17 2016 +0100 @@ -10,7 +10,7 @@ LDFLAGS += -ldl -all: o bin bin/piper-convert bin/piper-vamp-server +all: o bin bin/piper-convert bin/piper-vamp-simple-server bin: mkdir bin @@ -21,7 +21,7 @@ bin/piper-convert: o/convert.o o/json11.o o/piper.capnp.o c++ $(CXXFLAGS) $^ -o $@ $(LDFLAGS) -bin/piper-vamp-server: o/server.o o/json11.o o/piper.capnp.o +bin/piper-vamp-simple-server: o/simple-server.o o/json11.o o/piper.capnp.o c++ $(CXXFLAGS) $^ -o $@ $(LDFLAGS) #vamp-capnp/piper.capnp.h: $(PIPER_DIR)/capnp/piper.capnp @@ -36,7 +36,7 @@ o/convert.o: vamp-server/convert.cpp vamp-capnp/piper.capnp.h vamp-capnp/VampnProto.h vamp-json/VampJson.h c++ $(CXXFLAGS) $(INCFLAGS) -c $< -o $@ -o/server.o: vamp-server/server.cpp vamp-capnp/piper.capnp.h vamp-capnp/VampnProto.h vamp-json/VampJson.h +o/simple-server.o: vamp-server/simple-server.cpp vamp-capnp/piper.capnp.h vamp-capnp/VampnProto.h vamp-json/VampJson.h c++ $(CXXFLAGS) $(INCFLAGS) -c $< -o $@ test: all
--- a/test/test-server.sh Thu Oct 27 16:14:22 2016 +0100 +++ b/test/test-server.sh Fri Oct 28 11:08:17 2016 +0100 @@ -71,10 +71,10 @@ echo "$request" done | if [ "$format" = "json" ]; then - bin/piper-vamp-server -d json + bin/piper-vamp-simple-server -d json else bin/piper-convert request -i json -o capnp | - bin/piper-vamp-server -d capnp | + bin/piper-vamp-simple-server -d capnp | bin/piper-convert response -i capnp -o json fi | while read response ; do
--- a/vamp-client/ProcessQtTransport.h Thu Oct 27 16:14:22 2016 +0100 +++ b/vamp-client/ProcessQtTransport.h Fri Oct 28 11:08:17 2016 +0100 @@ -54,8 +54,11 @@ * using Qt's QProcess abstraction and talks to it via stdin/stdout * channels. Calls are completely serialized; the protocol only * supports one call in process at a time, and therefore the transport - * only allows one at a time. This class is thread-safe because it - * serializes explicitly using a mutex. + * only allows one at a time. + * + * This class is thread-safe, but in practice you can only use it from + * within a single thread, because the underlying QProcess does not + * support switching threads. */ class ProcessQtTransport : public SynchronousTransport { @@ -129,6 +132,7 @@ std::cerr << "writing " << size << " bytes to server" << std::endl; #endif m_process->write(ptr, size); + m_process->waitForBytesWritten(1000); std::vector<char> buffer; bool complete = false; @@ -142,6 +146,7 @@ std::cerr << "waiting for data from server..." << std::endl; #endif m_process->waitForReadyRead(1000); + if (m_process->state() == QProcess::NotRunning) { QProcess::ProcessError err = m_process->error(); if (err == QProcess::Crashed) {
--- a/vamp-server/server.cpp Thu Oct 27 16:14:22 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,637 +0,0 @@ -/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ -/* - Piper C++ - - An API for audio analysis and feature extraction plugins. - - Centre for Digital Music, Queen Mary, University of London. - Copyright 2006-2016 Chris Cannam and QMUL. - - Permission is hereby granted, free of charge, to any person - obtaining a copy of this software and associated documentation - files (the "Software"), to deal in the Software without - restriction, including without limitation the rights to use, copy, - modify, merge, publish, distribute, sublicense, and/or sell copies - of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be - included in all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR - ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF - CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION - WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - - Except as contained in this notice, the names of the Centre for - Digital Music; Queen Mary, University of London; and Chris Cannam - shall not be used in advertising or otherwise to promote the sale, - use or other dealings in this Software without prior written - authorization. -*/ - -#include "vamp-json/VampJson.h" -#include "vamp-capnp/VampnProto.h" -#include "vamp-support/RequestOrResponse.h" -#include "vamp-support/CountingPluginHandleMapper.h" -#include "vamp-support/LoaderRequests.h" - -#include <iostream> -#include <sstream> -#include <stdexcept> - -#include <capnp/serialize.h> - -#include <map> -#include <set> - -// pid for logging -#ifdef _WIN32 -#include <process.h> -static int pid = _getpid(); -#else -#include <unistd.h> -static int pid = getpid(); -#endif - -// for _setmode stuff -#ifdef _WIN32 -#include <io.h> -#include <fcntl.h> -#endif - -using namespace std; -using namespace json11; -using namespace piper_vamp; -using namespace Vamp; - -//!!! This could be faster and lighter: -// - Use Capnp structures directly rather than converting to vamp-support ones -// - Use Vamp C API (vamp.h) directly rather than converting to C++ -//!!! Doing the above for process() and finish() alone would be a good start - -static string myname = "piper-vamp-server"; - -static void version() -{ - cout << "1.0" << endl; - exit(0); -} - -static void usage(bool successful = false) -{ - cerr << "\n" << myname << - ": Load and run Vamp plugins in response to Piper messages\n\n" - " Usage: " << myname << " [-d] <format>\n" - " " << myname << " -v\n" - " " << myname << " -h\n\n" - " where\n" - " <format>: the format to read and write messages in (\"json\" or \"capnp\")\n" - " -d: also print debug information to stderr\n" - " -v: print version number to stdout and exit\n" - " -h: print this text to stderr and exit\n\n" - "Expects Piper request messages in either Cap'n Proto or JSON format on stdin,\n" - "and writes response messages in the same format to stdout.\n\n"; - if (successful) exit(0); - else exit(2); -} - -static CountingPluginHandleMapper mapper; - -static RequestOrResponse::RpcId -readId(const piper::RpcRequest::Reader &r) -{ - int number; - string tag; - switch (r.getId().which()) { - case piper::RpcRequest::Id::Which::NUMBER: - number = r.getId().getNumber(); - return { RequestOrResponse::RpcId::Number, number, "" }; - case piper::RpcRequest::Id::Which::TAG: - tag = r.getId().getTag(); - return { RequestOrResponse::RpcId::Tag, 0, tag }; - case piper::RpcRequest::Id::Which::NONE: - return { RequestOrResponse::RpcId::Absent, 0, "" }; - } - return {}; -} - -static void -buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id) -{ - switch (id.type) { - case RequestOrResponse::RpcId::Number: - b.getId().setNumber(id.number); - break; - case RequestOrResponse::RpcId::Tag: - b.getId().setTag(id.tag); - break; - case RequestOrResponse::RpcId::Absent: - b.getId().setNone(); - break; - } -} - -static RequestOrResponse::RpcId -readJsonId(const Json &j) -{ - RequestOrResponse::RpcId id; - - if (j["id"].is_number()) { - id.type = RequestOrResponse::RpcId::Number; - id.number = j["id"].number_value(); - } else if (j["id"].is_string()) { - id.type = RequestOrResponse::RpcId::Tag; - id.tag = j["id"].string_value(); - } else { - id.type = RequestOrResponse::RpcId::Absent; - } - - return id; -} - -static Json -writeJsonId(const RequestOrResponse::RpcId &id) -{ - if (id.type == RequestOrResponse::RpcId::Number) { - return id.number; - } else if (id.type == RequestOrResponse::RpcId::Tag) { - return id.tag; - } else { - return Json(); - } -} - -static Json -convertRequestJson(string input, string &err) -{ - Json j = Json::parse(input, err); - if (err != "") { - err = "invalid json: " + err; - return {}; - } - if (!j.is_object()) { - err = "object expected at top level"; - } else if (!j["method"].is_string()) { - err = "string expected for method field"; - } else if (!j["params"].is_null() && !j["params"].is_object()) { - err = "object expected for params field"; - } - return j; -} - -RequestOrResponse -readRequestJson(string &err) -{ - RequestOrResponse rr; - rr.direction = RequestOrResponse::Request; - - string input; - if (!getline(cin, input)) { - // the EOF case, not actually an error - rr.type = RRType::NotValid; - return rr; - } - - Json j = convertRequestJson(input, err); - if (err != "") return {}; - - rr.type = VampJson::getRequestResponseType(j, err); - if (err != "") return {}; - - rr.id = readJsonId(j); - - VampJson::BufferSerialisation serialisation = - VampJson::BufferSerialisation::Array; - - switch (rr.type) { - - case RRType::List: - VampJson::toRpcRequest_List(j, err); // type check only - break; - case RRType::Load: - rr.loadRequest = VampJson::toRpcRequest_Load(j, err); - break; - case RRType::Configure: - rr.configurationRequest = VampJson::toRpcRequest_Configure(j, mapper, err); - break; - case RRType::Process: - rr.processRequest = VampJson::toRpcRequest_Process(j, mapper, serialisation, err); - break; - case RRType::Finish: - rr.finishRequest = VampJson::toRpcRequest_Finish(j, mapper, err); - break; - case RRType::NotValid: - break; - } - - return rr; -} - -void -writeResponseJson(RequestOrResponse &rr, bool useBase64) -{ - Json j; - - VampJson::BufferSerialisation serialisation = - (useBase64 ? - VampJson::BufferSerialisation::Base64 : - VampJson::BufferSerialisation::Array); - - Json id = writeJsonId(rr.id); - - if (!rr.success) { - - j = VampJson::fromError(rr.errorText, rr.type, id); - - } else { - - switch (rr.type) { - - case RRType::List: - j = VampJson::fromRpcResponse_List(rr.listResponse, id); - break; - case RRType::Load: - j = VampJson::fromRpcResponse_Load(rr.loadResponse, mapper, id); - break; - case RRType::Configure: - j = VampJson::fromRpcResponse_Configure(rr.configurationResponse, - mapper, id); - break; - case RRType::Process: - j = VampJson::fromRpcResponse_Process - (rr.processResponse, mapper, serialisation, id); - break; - case RRType::Finish: - j = VampJson::fromRpcResponse_Finish - (rr.finishResponse, mapper, serialisation, id); - break; - case RRType::NotValid: - break; - } - } - - cout << j.dump() << endl; -} - -void -writeExceptionJson(const std::exception &e, RRType type) -{ - Json j = VampJson::fromError(e.what(), type, Json()); - cout << j.dump() << endl; -} - -RequestOrResponse -readRequestCapnp() -{ - RequestOrResponse rr; - rr.direction = RequestOrResponse::Request; - - static kj::FdInputStream stream(0); // stdin - static kj::BufferedInputStreamWrapper buffered(stream); - - if (buffered.tryGetReadBuffer() == nullptr) { - rr.type = RRType::NotValid; - return rr; - } - - capnp::InputStreamMessageReader message(buffered); - piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>(); - - rr.type = VampnProto::getRequestResponseType(reader); - rr.id = readId(reader); - - switch (rr.type) { - - case RRType::List: - VampnProto::readRpcRequest_List(reader); // type check only - break; - case RRType::Load: - VampnProto::readRpcRequest_Load(rr.loadRequest, reader); - break; - case RRType::Configure: - VampnProto::readRpcRequest_Configure(rr.configurationRequest, - reader, mapper); - break; - case RRType::Process: - VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); - break; - case RRType::Finish: - VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); - break; - case RRType::NotValid: - break; - } - - return rr; -} - -void -writeResponseCapnp(RequestOrResponse &rr) -{ - capnp::MallocMessageBuilder message; - piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>(); - - buildId(builder, rr.id); - - if (!rr.success) { - - VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); - - } else { - - switch (rr.type) { - - case RRType::List: - VampnProto::buildRpcResponse_List(builder, rr.listResponse); - break; - case RRType::Load: - VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); - break; - case RRType::Configure: - VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); - break; - case RRType::Process: - VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); - break; - case RRType::Finish: - VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); - break; - case RRType::NotValid: - break; - } - } - - writeMessageToFd(1, message); -} - -void -writeExceptionCapnp(const std::exception &e, RRType type) -{ - capnp::MallocMessageBuilder message; - piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>(); - VampnProto::buildRpcResponse_Exception(builder, e, type); - - writeMessageToFd(1, message); -} - -RequestOrResponse -handleRequest(const RequestOrResponse &request, bool debug) -{ - RequestOrResponse response; - response.direction = RequestOrResponse::Response; - response.type = request.type; - - switch (request.type) { - - case RRType::List: - response.listResponse = LoaderRequests().listPluginData(); - response.success = true; - break; - - case RRType::Load: - response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest); - if (response.loadResponse.plugin != nullptr) { - mapper.addPlugin(response.loadResponse.plugin); - if (debug) { - cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl; - } - response.success = true; - } - break; - - case RRType::Configure: - { - auto &creq = request.configurationRequest; - auto h = mapper.pluginToHandle(creq.plugin); - if (mapper.isConfigured(h)) { - throw runtime_error("plugin has already been configured"); - } - - response.configurationResponse = LoaderRequests().configurePlugin(creq); - - if (!response.configurationResponse.outputs.empty()) { - mapper.markConfigured - (h, creq.configuration.channelCount, creq.configuration.blockSize); - response.success = true; - } - break; - } - - case RRType::Process: - { - auto &preq = request.processRequest; - auto h = mapper.pluginToHandle(preq.plugin); - if (!mapper.isConfigured(h)) { - throw runtime_error("plugin has not been configured"); - } - - int channels = int(preq.inputBuffers.size()); - if (channels != mapper.getChannelCount(h)) { - throw runtime_error("wrong number of channels supplied to process"); - } - - const float **fbuffers = new const float *[channels]; - for (int i = 0; i < channels; ++i) { - if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { - delete[] fbuffers; - throw runtime_error("wrong block size supplied to process"); - } - fbuffers[i] = preq.inputBuffers[i].data(); - } - - response.processResponse.plugin = preq.plugin; - response.processResponse.features = - preq.plugin->process(fbuffers, preq.timestamp); - response.success = true; - - delete[] fbuffers; - break; - } - - case RRType::Finish: - { - auto &freq = request.finishRequest; - response.finishResponse.plugin = freq.plugin; - - auto h = mapper.pluginToHandle(freq.plugin); - // Finish can be called (to unload the plugin) even if the - // plugin has never been configured or used. But we want to - // make sure we call getRemainingFeatures only if we have - // actually configured the plugin. - if (mapper.isConfigured(h)) { - response.finishResponse.features = freq.plugin->getRemainingFeatures(); - } - - // We do not delete the plugin here -- we need it in the - // mapper when converting the features. It gets deleted in the - // calling function. - response.success = true; - break; - } - - case RRType::NotValid: - break; - } - - return response; -} - -RequestOrResponse -readRequest(string format) -{ - if (format == "capnp") { - return readRequestCapnp(); - } else if (format == "json") { - string err; - auto result = readRequestJson(err); - if (err != "") throw runtime_error(err); - else return result; - } else { - throw runtime_error("unknown input format \"" + format + "\""); - } -} - -void -writeResponse(string format, RequestOrResponse &rr) -{ - if (format == "capnp") { - writeResponseCapnp(rr); - } else if (format == "json") { - writeResponseJson(rr, false); - } else { - throw runtime_error("unknown output format \"" + format + "\""); - } -} - -void -writeException(string format, const std::exception &e, RRType type) -{ - if (format == "capnp") { - writeExceptionCapnp(e, type); - } else if (format == "json") { - writeExceptionJson(e, type); - } else { - throw runtime_error("unknown output format \"" + format + "\""); - } -} - -int main(int argc, char **argv) -{ - if (argc != 2 && argc != 3) { - usage(); - } - - bool debug = false; - - string arg = argv[1]; - if (arg == "-h") { - if (argc == 2) { - usage(true); - } else { - usage(); - } - } else if (arg == "-v") { - if (argc == 2) { - version(); - } else { - usage(); - } - } else if (arg == "-d") { - if (argc == 2) { - usage(); - } else { - debug = true; - arg = argv[2]; - } - } - - string format = arg; - - if (format != "capnp" && format != "json") { - usage(); - } - -#ifdef _WIN32 - if (format == "capnp") { - int result = _setmode(_fileno(stdin), _O_BINARY); - if (result == -1) { - cerr << "Failed to set binary mode on stdin, necessary for capnp format" << endl; - exit(1); - } - result = _setmode(_fileno(stdout), _O_BINARY); - if (result == -1) { - cerr << "Failed to set binary mode on stdout, necessary for capnp format" << endl; - exit(1); - } - } -#endif - - if (debug) { - cerr << myname << " " << pid << ": waiting for format: " << format << endl; - } - - while (true) { - - RequestOrResponse request; - - try { - - request = readRequest(format); - - // NotValid without an exception indicates EOF: - if (request.type == RRType::NotValid) { - if (debug) { - cerr << myname << " " << pid << ": eof reached, exiting" << endl; - } - break; - } - - if (debug) { - cerr << myname << " " << pid << ": request received, of type " - << int(request.type) - << endl; - } - - RequestOrResponse response = handleRequest(request, debug); - response.id = request.id; - - if (debug) { - cerr << myname << " " << pid << ": request handled, writing response" - << endl; - } - - writeResponse(format, response); - - if (debug) { - cerr << myname << " " << pid << ": response written" << endl; - } - - if (request.type == RRType::Finish) { - auto h = mapper.pluginToHandle(request.finishRequest.plugin); - if (debug) { - cerr << myname << " " << pid << ": deleting the plugin with handle " << h << endl; - } - mapper.removePlugin(h); - delete request.finishRequest.plugin; - } - - } catch (std::exception &e) { - - if (debug) { - cerr << myname << " " << pid << ": error: " << e.what() << endl; - } - - writeException(format, e, request.type); - - //!!! some exceptions should not be continued after, - //! but json/capnp parser ones should - //exit(1); - } - } - - exit(0); -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/vamp-server/simple-server.cpp Fri Oct 28 11:08:17 2016 +0100 @@ -0,0 +1,652 @@ +/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ +/* + Piper C++ + + An API for audio analysis and feature extraction plugins. + + Centre for Digital Music, Queen Mary, University of London. + Copyright 2006-2016 Chris Cannam and QMUL. + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this software and associated documentation + files (the "Software"), to deal in the Software without + restriction, including without limitation the rights to use, copy, + modify, merge, publish, distribute, sublicense, and/or sell copies + of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR + ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + Except as contained in this notice, the names of the Centre for + Digital Music; Queen Mary, University of London; and Chris Cannam + shall not be used in advertising or otherwise to promote the sale, + use or other dealings in this Software without prior written + authorization. +*/ + +#include "vamp-json/VampJson.h" +#include "vamp-capnp/VampnProto.h" +#include "vamp-support/RequestOrResponse.h" +#include "vamp-support/CountingPluginHandleMapper.h" +#include "vamp-support/LoaderRequests.h" + +#include <iostream> +#include <sstream> +#include <stdexcept> + +#include <capnp/serialize.h> + +#include <map> +#include <set> + +// pid for logging +#ifdef _WIN32 +#include <process.h> +static int pid = _getpid(); +#else +#include <unistd.h> +static int pid = getpid(); +#endif + +// for _setmode stuff +#ifdef _WIN32 +#include <io.h> +#include <fcntl.h> +#endif + +using namespace std; +using namespace json11; +using namespace piper_vamp; +using namespace Vamp; + +//!!! This could be faster and lighter: +// - Use Capnp structures directly rather than converting to vamp-support ones +// - Use Vamp C API (vamp.h) directly rather than converting to C++ +//!!! Doing the above for process() and finish() alone would be a good start + +static string myname = "piper-vamp-simple-server"; + +static void version() +{ + cout << "1.0" << endl; + exit(0); +} + +static void usage(bool successful = false) +{ + cerr << "\n" << myname << + ": Load & run Vamp plugins in response to Piper messages\n\n" + " Usage: " << myname << " [-d] <format>\n" + " " << myname << " -v\n" + " " << myname << " -h\n\n" + " where\n" + " <format>: the format to read and write messages in (\"json\" or \"capnp\")\n" + " -d: also print debug information to stderr\n" + " -v: print version number to stdout and exit\n" + " -h: print this text to stderr and exit\n\n" + "Expects Piper request messages in either Cap'n Proto or JSON format on stdin,\n" + "and writes response messages in the same format to stdout.\n\n" + "This server is intended for simple process separation. It's only suitable for\n" + "use with a single trusted client per server invocation.\n\n" + "The two formats behave differently in case of parser errors. JSON messages are\n" + "expected one per input line; because the JSON support is really intended for\n" + "interactive troubleshooting, any unparseable message is reported and discarded\n" + "and the server waits for another message. In contrast, because of the assumption\n" + "that the client is trusted and coupled to the server instance, a mangled\n" + "Cap'n Proto message causes the server to exit.\n\n"; + if (successful) exit(0); + else exit(2); +} + +static CountingPluginHandleMapper mapper; + +static RequestOrResponse::RpcId +readId(const piper::RpcRequest::Reader &r) +{ + int number; + string tag; + switch (r.getId().which()) { + case piper::RpcRequest::Id::Which::NUMBER: + number = r.getId().getNumber(); + return { RequestOrResponse::RpcId::Number, number, "" }; + case piper::RpcRequest::Id::Which::TAG: + tag = r.getId().getTag(); + return { RequestOrResponse::RpcId::Tag, 0, tag }; + case piper::RpcRequest::Id::Which::NONE: + return { RequestOrResponse::RpcId::Absent, 0, "" }; + } + return {}; +} + +static void +buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id) +{ + switch (id.type) { + case RequestOrResponse::RpcId::Number: + b.getId().setNumber(id.number); + break; + case RequestOrResponse::RpcId::Tag: + b.getId().setTag(id.tag); + break; + case RequestOrResponse::RpcId::Absent: + b.getId().setNone(); + break; + } +} + +static RequestOrResponse::RpcId +readJsonId(const Json &j) +{ + RequestOrResponse::RpcId id; + + if (j["id"].is_number()) { + id.type = RequestOrResponse::RpcId::Number; + id.number = j["id"].number_value(); + } else if (j["id"].is_string()) { + id.type = RequestOrResponse::RpcId::Tag; + id.tag = j["id"].string_value(); + } else { + id.type = RequestOrResponse::RpcId::Absent; + } + + return id; +} + +static Json +writeJsonId(const RequestOrResponse::RpcId &id) +{ + if (id.type == RequestOrResponse::RpcId::Number) { + return id.number; + } else if (id.type == RequestOrResponse::RpcId::Tag) { + return id.tag; + } else { + return Json(); + } +} + +static Json +convertRequestJson(string input, string &err) +{ + Json j = Json::parse(input, err); + if (err != "") { + err = "invalid json: " + err; + return {}; + } + if (!j.is_object()) { + err = "object expected at top level"; + } else if (!j["method"].is_string()) { + err = "string expected for method field"; + } else if (!j["params"].is_null() && !j["params"].is_object()) { + err = "object expected for params field"; + } + return j; +} + +RequestOrResponse +readRequestJson(string &err) +{ + RequestOrResponse rr; + rr.direction = RequestOrResponse::Request; + + string input; + if (!getline(cin, input)) { + // the EOF case, not actually an error + rr.type = RRType::NotValid; + return rr; + } + + Json j = convertRequestJson(input, err); + if (err != "") return {}; + + rr.type = VampJson::getRequestResponseType(j, err); + if (err != "") return {}; + + rr.id = readJsonId(j); + + VampJson::BufferSerialisation serialisation = + VampJson::BufferSerialisation::Array; + + switch (rr.type) { + + case RRType::List: + VampJson::toRpcRequest_List(j, err); // type check only + break; + case RRType::Load: + rr.loadRequest = VampJson::toRpcRequest_Load(j, err); + break; + case RRType::Configure: + rr.configurationRequest = VampJson::toRpcRequest_Configure(j, mapper, err); + break; + case RRType::Process: + rr.processRequest = VampJson::toRpcRequest_Process(j, mapper, serialisation, err); + break; + case RRType::Finish: + rr.finishRequest = VampJson::toRpcRequest_Finish(j, mapper, err); + break; + case RRType::NotValid: + break; + } + + return rr; +} + +void +writeResponseJson(RequestOrResponse &rr, bool useBase64) +{ + Json j; + + VampJson::BufferSerialisation serialisation = + (useBase64 ? + VampJson::BufferSerialisation::Base64 : + VampJson::BufferSerialisation::Array); + + Json id = writeJsonId(rr.id); + + if (!rr.success) { + + j = VampJson::fromError(rr.errorText, rr.type, id); + + } else { + + switch (rr.type) { + + case RRType::List: + j = VampJson::fromRpcResponse_List(rr.listResponse, id); + break; + case RRType::Load: + j = VampJson::fromRpcResponse_Load(rr.loadResponse, mapper, id); + break; + case RRType::Configure: + j = VampJson::fromRpcResponse_Configure(rr.configurationResponse, + mapper, id); + break; + case RRType::Process: + j = VampJson::fromRpcResponse_Process + (rr.processResponse, mapper, serialisation, id); + break; + case RRType::Finish: + j = VampJson::fromRpcResponse_Finish + (rr.finishResponse, mapper, serialisation, id); + break; + case RRType::NotValid: + break; + } + } + + cout << j.dump() << endl; +} + +void +writeExceptionJson(const std::exception &e, RRType type) +{ + Json j = VampJson::fromError(e.what(), type, Json()); + cout << j.dump() << endl; +} + +RequestOrResponse +readRequestCapnp() +{ + RequestOrResponse rr; + rr.direction = RequestOrResponse::Request; + + static kj::FdInputStream stream(0); // stdin + static kj::BufferedInputStreamWrapper buffered(stream); + + if (buffered.tryGetReadBuffer() == nullptr) { + rr.type = RRType::NotValid; + return rr; + } + + capnp::InputStreamMessageReader message(buffered); + piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>(); + + rr.type = VampnProto::getRequestResponseType(reader); + rr.id = readId(reader); + + switch (rr.type) { + + case RRType::List: + VampnProto::readRpcRequest_List(reader); // type check only + break; + case RRType::Load: + VampnProto::readRpcRequest_Load(rr.loadRequest, reader); + break; + case RRType::Configure: + VampnProto::readRpcRequest_Configure(rr.configurationRequest, + reader, mapper); + break; + case RRType::Process: + VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); + break; + case RRType::Finish: + VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); + break; + case RRType::NotValid: + break; + } + + return rr; +} + +void +writeResponseCapnp(RequestOrResponse &rr) +{ + capnp::MallocMessageBuilder message; + piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>(); + + buildId(builder, rr.id); + + if (!rr.success) { + + VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); + + } else { + + switch (rr.type) { + + case RRType::List: + VampnProto::buildRpcResponse_List(builder, rr.listResponse); + break; + case RRType::Load: + VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); + break; + case RRType::Configure: + VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); + break; + case RRType::Process: + VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); + break; + case RRType::Finish: + VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); + break; + case RRType::NotValid: + break; + } + } + + writeMessageToFd(1, message); +} + +void +writeExceptionCapnp(const std::exception &e, RRType type) +{ + capnp::MallocMessageBuilder message; + piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>(); + VampnProto::buildRpcResponse_Exception(builder, e, type); + + writeMessageToFd(1, message); +} + +RequestOrResponse +handleRequest(const RequestOrResponse &request, bool debug) +{ + RequestOrResponse response; + response.direction = RequestOrResponse::Response; + response.type = request.type; + + switch (request.type) { + + case RRType::List: + response.listResponse = LoaderRequests().listPluginData(); + response.success = true; + break; + + case RRType::Load: + response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest); + if (response.loadResponse.plugin != nullptr) { + mapper.addPlugin(response.loadResponse.plugin); + if (debug) { + cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl; + } + response.success = true; + } + break; + + case RRType::Configure: + { + auto &creq = request.configurationRequest; + auto h = mapper.pluginToHandle(creq.plugin); + if (mapper.isConfigured(h)) { + throw runtime_error("plugin has already been configured"); + } + + response.configurationResponse = LoaderRequests().configurePlugin(creq); + + if (!response.configurationResponse.outputs.empty()) { + mapper.markConfigured + (h, creq.configuration.channelCount, creq.configuration.blockSize); + response.success = true; + } + break; + } + + case RRType::Process: + { + auto &preq = request.processRequest; + auto h = mapper.pluginToHandle(preq.plugin); + if (!mapper.isConfigured(h)) { + throw runtime_error("plugin has not been configured"); + } + + int channels = int(preq.inputBuffers.size()); + if (channels != mapper.getChannelCount(h)) { + throw runtime_error("wrong number of channels supplied to process"); + } + + const float **fbuffers = new const float *[channels]; + for (int i = 0; i < channels; ++i) { + if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { + delete[] fbuffers; + throw runtime_error("wrong block size supplied to process"); + } + fbuffers[i] = preq.inputBuffers[i].data(); + } + + response.processResponse.plugin = preq.plugin; + response.processResponse.features = + preq.plugin->process(fbuffers, preq.timestamp); + response.success = true; + + delete[] fbuffers; + break; + } + + case RRType::Finish: + { + auto &freq = request.finishRequest; + response.finishResponse.plugin = freq.plugin; + + auto h = mapper.pluginToHandle(freq.plugin); + // Finish can be called (to unload the plugin) even if the + // plugin has never been configured or used. But we want to + // make sure we call getRemainingFeatures only if we have + // actually configured the plugin. + if (mapper.isConfigured(h)) { + response.finishResponse.features = freq.plugin->getRemainingFeatures(); + } + + // We do not delete the plugin here -- we need it in the + // mapper when converting the features. It gets deleted in the + // calling function. + response.success = true; + break; + } + + case RRType::NotValid: + break; + } + + return response; +} + +RequestOrResponse +readRequest(string format) +{ + if (format == "capnp") { + return readRequestCapnp(); + } else if (format == "json") { + string err; + auto result = readRequestJson(err); + if (err != "") throw runtime_error(err); + else return result; + } else { + throw runtime_error("unknown input format \"" + format + "\""); + } +} + +void +writeResponse(string format, RequestOrResponse &rr) +{ + if (format == "capnp") { + writeResponseCapnp(rr); + } else if (format == "json") { + writeResponseJson(rr, false); + } else { + throw runtime_error("unknown output format \"" + format + "\""); + } +} + +void +writeException(string format, const std::exception &e, RRType type) +{ + if (format == "capnp") { + writeExceptionCapnp(e, type); + } else if (format == "json") { + writeExceptionJson(e, type); + } else { + throw runtime_error("unknown output format \"" + format + "\""); + } +} + +int main(int argc, char **argv) +{ + if (argc != 2 && argc != 3) { + usage(); + } + + bool debug = false; + + string arg = argv[1]; + if (arg == "-h") { + if (argc == 2) { + usage(true); + } else { + usage(); + } + } else if (arg == "-v") { + if (argc == 2) { + version(); + } else { + usage(); + } + } else if (arg == "-d") { + if (argc == 2) { + usage(); + } else { + debug = true; + arg = argv[2]; + } + } + + string format = arg; + + if (format != "capnp" && format != "json") { + usage(); + } + +#ifdef _WIN32 + if (format == "capnp") { + int result = _setmode(_fileno(stdin), _O_BINARY); + if (result == -1) { + cerr << "Failed to set binary mode on stdin, necessary for capnp format" << endl; + exit(1); + } + result = _setmode(_fileno(stdout), _O_BINARY); + if (result == -1) { + cerr << "Failed to set binary mode on stdout, necessary for capnp format" << endl; + exit(1); + } + } +#endif + + if (debug) { + cerr << myname << " " << pid << ": waiting for format: " << format << endl; + } + + while (true) { + + RequestOrResponse request; + + try { + + request = readRequest(format); + + // NotValid without an exception indicates EOF: + if (request.type == RRType::NotValid) { + if (debug) { + cerr << myname << " " << pid << ": eof reached, exiting" << endl; + } + break; + } + + if (debug) { + cerr << myname << " " << pid << ": request received, of type " + << int(request.type) + << endl; + } + + RequestOrResponse response = handleRequest(request, debug); + response.id = request.id; + + if (debug) { + cerr << myname << " " << pid << ": request handled, writing response" + << endl; + } + + writeResponse(format, response); + + if (debug) { + cerr << myname << " " << pid << ": response written" << endl; + } + + if (request.type == RRType::Finish) { + auto h = mapper.pluginToHandle(request.finishRequest.plugin); + if (debug) { + cerr << myname << " " << pid << ": deleting the plugin with handle " << h << endl; + } + mapper.removePlugin(h); + delete request.finishRequest.plugin; + } + + } catch (std::exception &e) { + + if (debug) { + cerr << myname << " " << pid << ": error: " << e.what() << endl; + } + + writeException(format, e, request.type); + + if (format == "capnp") { + // Don't try to continue; we can't recover from a + // mangled input stream. However, we can return a + // successful error code because we are reporting the + // status in our Capnp output stream instead + if (debug) { + cerr << myname << " " << pid << ": not attempting to recover from capnp parse problems, exiting" << endl; + } + exit(0); + } + } + } + + exit(0); +}