Mercurial > hg > piper-cpp
diff vamp-server/server.cpp @ 116:d15cb1151d76
Add JSON support directly to the server. Had hoped to avoid this (using Capnp as canonical in the server and then converting externally as necessary) but it's just too useful for debugging purposes when bundled with client app
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Thu, 27 Oct 2016 11:39:41 +0100 |
parents | b418b583fd3b |
children | ff3fd8d1b2dc |
line wrap: on
line diff
--- a/vamp-server/server.cpp Thu Oct 27 10:28:10 2016 +0100 +++ b/vamp-server/server.cpp Thu Oct 27 11:39:41 2016 +0100 @@ -1,4 +1,6 @@ +/* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ +#include "vamp-json/VampJson.h" #include "vamp-capnp/VampnProto.h" #include "vamp-support/RequestOrResponse.h" #include "vamp-support/CountingPluginHandleMapper.h" @@ -23,6 +25,7 @@ #endif using namespace std; +using namespace json11; using namespace piper_vamp; using namespace Vamp; @@ -31,21 +34,36 @@ // - Use Vamp C API (vamp.h) directly rather than converting to C++ //!!! Doing the above for process() and finish() alone would be a good start -void usage() +static string myname = "piper-vamp-server"; + +static void version() { - string myname = "piper-vamp-server"; + cout << "1.0" << endl; + exit(0); +} + +static void usage(bool successful = false) +{ cerr << "\n" << myname << - ": Load and run Vamp plugins in response to messages from stdin\n\n" - " Usage: " << myname << "\n\n" - "Expects Piper request messages in Cap'n Proto packed format on stdin,\n" - "and writes Piper response messages in the same format to stdout.\n\n"; - - exit(2); + ": Load and run Vamp plugins in response to Piper messages\n\n" + " Usage: " << myname << " [-d] <format>\n" + " " << myname << " -v\n" + " " << myname << " -h\n\n" + " where\n" + " <format>: the format to read and write messages in (\"json\" or \"capnp\")\n" + " -d: also print debug information to stderr\n" + " -v: print version number to stdout and exit\n" + " -h: print this text to stderr and exit\n\n" + "Expects Piper request messages in either Cap'n Proto or JSON format on stdin,\n" + "and writes response messages in the same format to stdout.\n\n"; + if (successful) exit(0); + else exit(2); } static CountingPluginHandleMapper mapper; -static RequestOrResponse::RpcId readId(const piper::RpcRequest::Reader &r) +static RequestOrResponse::RpcId +readId(const piper::RpcRequest::Reader &r) { int number; string tag; @@ -62,7 +80,8 @@ return {}; } -static void buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id) +static void +buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id) { switch (id.type) { case RequestOrResponse::RpcId::Number: @@ -77,6 +96,155 @@ } } +static RequestOrResponse::RpcId +readJsonId(const Json &j) +{ + RequestOrResponse::RpcId id; + + if (j["id"].is_number()) { + id.type = RequestOrResponse::RpcId::Number; + id.number = j["id"].number_value(); + } else if (j["id"].is_string()) { + id.type = RequestOrResponse::RpcId::Tag; + id.tag = j["id"].string_value(); + } else { + id.type = RequestOrResponse::RpcId::Absent; + } + + return id; +} + +static Json +writeJsonId(const RequestOrResponse::RpcId &id) +{ + if (id.type == RequestOrResponse::RpcId::Number) { + return id.number; + } else if (id.type == RequestOrResponse::RpcId::Tag) { + return id.tag; + } else { + return Json(); + } +} + +static Json +convertRequestJson(string input, string &err) +{ + Json j = Json::parse(input, err); + if (err != "") { + err = "invalid json: " + err; + return {}; + } + if (!j.is_object()) { + err = "object expected at top level"; + } else if (!j["method"].is_string()) { + err = "string expected for method field"; + } else if (!j["params"].is_null() && !j["params"].is_object()) { + err = "object expected for params field"; + } + return j; +} + +RequestOrResponse +readRequestJson(string &err) +{ + RequestOrResponse rr; + rr.direction = RequestOrResponse::Request; + + string input; + if (!getline(cin, input)) { + // the EOF case, not actually an error + rr.type = RRType::NotValid; + return rr; + } + + Json j = convertRequestJson(input, err); + if (err != "") return {}; + + rr.type = VampJson::getRequestResponseType(j, err); + if (err != "") return {}; + + rr.id = readJsonId(j); + + VampJson::BufferSerialisation serialisation = + VampJson::BufferSerialisation::Array; + + switch (rr.type) { + + case RRType::List: + VampJson::toRpcRequest_List(j, err); // type check only + break; + case RRType::Load: + rr.loadRequest = VampJson::toRpcRequest_Load(j, err); + break; + case RRType::Configure: + rr.configurationRequest = VampJson::toRpcRequest_Configure(j, mapper, err); + break; + case RRType::Process: + rr.processRequest = VampJson::toRpcRequest_Process(j, mapper, serialisation, err); + break; + case RRType::Finish: + rr.finishRequest = VampJson::toRpcRequest_Finish(j, mapper, err); + break; + case RRType::NotValid: + break; + } + + return rr; +} + +void +writeResponseJson(RequestOrResponse &rr, bool useBase64) +{ + Json j; + + VampJson::BufferSerialisation serialisation = + (useBase64 ? + VampJson::BufferSerialisation::Base64 : + VampJson::BufferSerialisation::Array); + + Json id = writeJsonId(rr.id); + + if (!rr.success) { + + j = VampJson::fromError(rr.errorText, rr.type, id); + + } else { + + switch (rr.type) { + + case RRType::List: + j = VampJson::fromRpcResponse_List(rr.listResponse, id); + break; + case RRType::Load: + j = VampJson::fromRpcResponse_Load(rr.loadResponse, mapper, id); + break; + case RRType::Configure: + j = VampJson::fromRpcResponse_Configure(rr.configurationResponse, + mapper, id); + break; + case RRType::Process: + j = VampJson::fromRpcResponse_Process + (rr.processResponse, mapper, serialisation, id); + break; + case RRType::Finish: + j = VampJson::fromRpcResponse_Finish + (rr.finishResponse, mapper, serialisation, id); + break; + case RRType::NotValid: + break; + } + } + + cout << j.dump() << endl; +} + +void +writeExceptionJson(const std::exception &e, RRType type) +{ + Json j = VampJson::fromError(e.what(), type, Json()); + cout << j.dump() << endl; +} + RequestOrResponse readRequestCapnp() { @@ -87,8 +255,8 @@ static kj::BufferedInputStreamWrapper buffered(stream); if (buffered.tryGetReadBuffer() == nullptr) { - rr.type = RRType::NotValid; - return rr; + rr.type = RRType::NotValid; + return rr; } capnp::InputStreamMessageReader message(buffered); @@ -100,23 +268,23 @@ switch (rr.type) { case RRType::List: - VampnProto::readRpcRequest_List(reader); // type check only - break; + VampnProto::readRpcRequest_List(reader); // type check only + break; case RRType::Load: - VampnProto::readRpcRequest_Load(rr.loadRequest, reader); - break; + VampnProto::readRpcRequest_Load(rr.loadRequest, reader); + break; case RRType::Configure: - VampnProto::readRpcRequest_Configure(rr.configurationRequest, - reader, mapper); - break; + VampnProto::readRpcRequest_Configure(rr.configurationRequest, + reader, mapper); + break; case RRType::Process: - VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); - break; + VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); + break; case RRType::Finish: - VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); - break; + VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); + break; case RRType::NotValid: - break; + break; } return rr; @@ -132,30 +300,30 @@ if (!rr.success) { - VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); + VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); } else { - - switch (rr.type) { + + switch (rr.type) { - case RRType::List: - VampnProto::buildRpcResponse_List(builder, rr.listResponse); - break; - case RRType::Load: - VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); - break; - case RRType::Configure: - VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); - break; - case RRType::Process: - VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); - break; - case RRType::Finish: - VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); - break; - case RRType::NotValid: - break; - } + case RRType::List: + VampnProto::buildRpcResponse_List(builder, rr.listResponse); + break; + case RRType::Load: + VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); + break; + case RRType::Configure: + VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); + break; + case RRType::Process: + VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); + break; + case RRType::Finish: + VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); + break; + case RRType::NotValid: + break; + } } writeMessageToFd(1, message); @@ -172,7 +340,7 @@ } RequestOrResponse -handleRequest(const RequestOrResponse &request) +handleRequest(const RequestOrResponse &request, bool debug) { RequestOrResponse response; response.direction = RequestOrResponse::Response; @@ -181,147 +349,232 @@ switch (request.type) { case RRType::List: - response.listResponse = LoaderRequests().listPluginData(); - response.success = true; - break; + response.listResponse = LoaderRequests().listPluginData(); + response.success = true; + break; case RRType::Load: - response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest); - if (response.loadResponse.plugin != nullptr) { - mapper.addPlugin(response.loadResponse.plugin); - cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl; - response.success = true; - } - break; - + response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest); + if (response.loadResponse.plugin != nullptr) { + mapper.addPlugin(response.loadResponse.plugin); + if (debug) { + cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl; + } + response.success = true; + } + break; + case RRType::Configure: { - auto &creq = request.configurationRequest; - auto h = mapper.pluginToHandle(creq.plugin); - if (mapper.isConfigured(h)) { - throw runtime_error("plugin has already been configured"); - } + auto &creq = request.configurationRequest; + auto h = mapper.pluginToHandle(creq.plugin); + if (mapper.isConfigured(h)) { + throw runtime_error("plugin has already been configured"); + } - response.configurationResponse = LoaderRequests().configurePlugin(creq); - - if (!response.configurationResponse.outputs.empty()) { - mapper.markConfigured - (h, creq.configuration.channelCount, creq.configuration.blockSize); - response.success = true; - } - break; + response.configurationResponse = LoaderRequests().configurePlugin(creq); + + if (!response.configurationResponse.outputs.empty()) { + mapper.markConfigured + (h, creq.configuration.channelCount, creq.configuration.blockSize); + response.success = true; + } + break; } case RRType::Process: { - auto &preq = request.processRequest; - auto h = mapper.pluginToHandle(preq.plugin); - if (!mapper.isConfigured(h)) { - throw runtime_error("plugin has not been configured"); - } + auto &preq = request.processRequest; + auto h = mapper.pluginToHandle(preq.plugin); + if (!mapper.isConfigured(h)) { + throw runtime_error("plugin has not been configured"); + } - int channels = int(preq.inputBuffers.size()); - if (channels != mapper.getChannelCount(h)) { - throw runtime_error("wrong number of channels supplied to process"); - } - - const float **fbuffers = new const float *[channels]; - for (int i = 0; i < channels; ++i) { - if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { - delete[] fbuffers; - throw runtime_error("wrong block size supplied to process"); - } - fbuffers[i] = preq.inputBuffers[i].data(); - } + int channels = int(preq.inputBuffers.size()); + if (channels != mapper.getChannelCount(h)) { + throw runtime_error("wrong number of channels supplied to process"); + } + + const float **fbuffers = new const float *[channels]; + for (int i = 0; i < channels; ++i) { + if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { + delete[] fbuffers; + throw runtime_error("wrong block size supplied to process"); + } + fbuffers[i] = preq.inputBuffers[i].data(); + } - response.processResponse.plugin = preq.plugin; - response.processResponse.features = - preq.plugin->process(fbuffers, preq.timestamp); - response.success = true; + response.processResponse.plugin = preq.plugin; + response.processResponse.features = + preq.plugin->process(fbuffers, preq.timestamp); + response.success = true; - delete[] fbuffers; - break; + delete[] fbuffers; + break; } case RRType::Finish: { - auto &freq = request.finishRequest; - response.finishResponse.plugin = freq.plugin; + auto &freq = request.finishRequest; + response.finishResponse.plugin = freq.plugin; - auto h = mapper.pluginToHandle(freq.plugin); + auto h = mapper.pluginToHandle(freq.plugin); // Finish can be called (to unload the plugin) even if the // plugin has never been configured or used. But we want to // make sure we call getRemainingFeatures only if we have // actually configured the plugin. - if (mapper.isConfigured(h)) { + if (mapper.isConfigured(h)) { response.finishResponse.features = freq.plugin->getRemainingFeatures(); - } + } - // We do not delete the plugin here -- we need it in the - // mapper when converting the features. It gets deleted in the - // calling function. - response.success = true; - break; + // We do not delete the plugin here -- we need it in the + // mapper when converting the features. It gets deleted in the + // calling function. + response.success = true; + break; } case RRType::NotValid: - break; + break; } return response; } -int main(int argc, char **) +RequestOrResponse +readRequest(string format) { - if (argc != 1) { - usage(); + if (format == "capnp") { + return readRequestCapnp(); + } else if (format == "json") { + string err; + auto result = readRequestJson(err); + if (err != "") throw runtime_error(err); + else return result; + } else { + throw runtime_error("unknown input format \"" + format + "\""); + } +} + +void +writeResponse(string format, RequestOrResponse &rr) +{ + if (format == "capnp") { + writeResponseCapnp(rr); + } else if (format == "json") { + writeResponseJson(rr, false); + } else { + throw runtime_error("unknown output format \"" + format + "\""); + } +} + +void +writeException(string format, const std::exception &e, RRType type) +{ + if (format == "capnp") { + writeExceptionCapnp(e, type); + } else if (format == "json") { + writeExceptionJson(e, type); + } else { + throw runtime_error("unknown output format \"" + format + "\""); + } +} + +int main(int argc, char **argv) +{ + if (argc != 2 && argc != 3) { + usage(); } - cerr << "piper-vamp-server: ready" << endl; + bool debug = false; + string arg = argv[1]; + if (arg == "-h") { + if (argc == 2) { + usage(true); + } else { + usage(); + } + } else if (arg == "-v") { + if (argc == 2) { + version(); + } else { + usage(); + } + } else if (arg == "-d") { + if (argc == 2) { + usage(); + } else { + debug = true; + arg = argv[2]; + } + } + + string format = arg; + + if (format != "capnp" && format != "json") { + usage(); + } + + if (debug) { + cerr << myname << " " << pid << ": waiting for format: " << format << endl; + } + while (true) { - RequestOrResponse request; - - try { + RequestOrResponse request; + + try { - request = readRequestCapnp(); + request = readRequest(format); + + // NotValid without an exception indicates EOF: + if (request.type == RRType::NotValid) { + if (debug) { + cerr << myname << " " << pid << ": eof reached, exiting" << endl; + } + break; + } - cerr << "piper-vamp-server " << pid << ": request received, of type " - << int(request.type) - << endl; - - // NotValid without an exception indicates EOF: - if (request.type == RRType::NotValid) { - cerr << "piper-vamp-server " << pid << ": eof reached, exiting" << endl; - break; - } + if (debug) { + cerr << myname << " " << pid << ": request received, of type " + << int(request.type) + << endl; + } - RequestOrResponse response = handleRequest(request); + RequestOrResponse response = handleRequest(request, debug); response.id = request.id; - cerr << "piper-vamp-server " << pid << ": request handled, writing response" - << endl; - - writeResponseCapnp(response); + if (debug) { + cerr << myname << " " << pid << ": request handled, writing response" + << endl; + } + + writeResponse(format, response); - cerr << "piper-vamp-server " << pid << ": response written" << endl; + if (debug) { + cerr << myname << " " << pid << ": response written" << endl; + } - if (request.type == RRType::Finish) { - auto h = mapper.pluginToHandle(request.finishRequest.plugin); - cerr << "piper-vamp-server " << pid << ": deleting the plugin with handle " << h << endl; - mapper.removePlugin(h); - delete request.finishRequest.plugin; - } - - } catch (std::exception &e) { + if (request.type == RRType::Finish) { + auto h = mapper.pluginToHandle(request.finishRequest.plugin); + if (debug) { + cerr << myname << " " << pid << ": deleting the plugin with handle " << h << endl; + } + mapper.removePlugin(h); + delete request.finishRequest.plugin; + } + + } catch (std::exception &e) { - cerr << "piper-vamp-server " << pid << ": error: " << e.what() << endl; + if (debug) { + cerr << myname << " " << pid << ": error: " << e.what() << endl; + } - writeExceptionCapnp(e, request.type); - - exit(1); - } + writeException(format, e, request.type); + + exit(1); + } } exit(0);