cannam@111: /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */ c@118: /* c@118: Piper C++ c@118: c@118: An API for audio analysis and feature extraction plugins. c@118: c@118: Centre for Digital Music, Queen Mary, University of London. c@118: Copyright 2006-2016 Chris Cannam and QMUL. c@118: c@118: Permission is hereby granted, free of charge, to any person c@118: obtaining a copy of this software and associated documentation c@118: files (the "Software"), to deal in the Software without c@118: restriction, including without limitation the rights to use, copy, c@118: modify, merge, publish, distribute, sublicense, and/or sell copies c@118: of the Software, and to permit persons to whom the Software is c@118: furnished to do so, subject to the following conditions: c@118: c@118: The above copyright notice and this permission notice shall be c@118: included in all copies or substantial portions of the Software. c@118: c@118: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, c@118: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF c@118: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND c@118: NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR c@118: ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF c@118: CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION c@118: WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. c@118: c@118: Except as contained in this notice, the names of the Centre for c@118: Digital Music; Queen Mary, University of London; and Chris Cannam c@118: shall not be used in advertising or otherwise to promote the sale, c@118: use or other dealings in this Software without prior written c@118: authorization. c@118: */ c@96: c@96: #ifndef PIPER_CAPNP_CLIENT_H c@96: #define PIPER_CAPNP_CLIENT_H c@96: c@96: #include "Loader.h" c@96: #include "PluginClient.h" cannam@208: #include "PiperVampPlugin.h" c@96: #include "SynchronousTransport.h" c@96: c@96: #include "vamp-support/AssignedPluginHandleMapper.h" c@96: #include "vamp-capnp/VampnProto.h" c@96: c@134: #include c@134: c@96: #include c@96: c@144: //#define LOG_ENTRYPOINTS 1 c@142: c@142: #ifdef LOG_ENTRYPOINTS c@142: #define LOG_E(x) log(x) c@142: #else c@142: #define LOG_E(x) c@142: #endif c@142: c@97: namespace piper_vamp { c@97: namespace client { c@96: c@100: /** c@100: * Client for a request-response Piper server, i.e. using the c@100: * RpcRequest/RpcResponse structures with a single process call rather c@100: * than having individual RPC methods, with a synchronous transport c@100: * such as a subprocess pipe arrangement. Only one request can be c@100: * handled at a time. This class is thread-safe if and only if it is c@100: * constructed with a thread-safe SynchronousTransport implementation. cannam@187: * cannam@187: * This class takes Vamp-like structures (Plugin and the classes in cannam@187: * vamp-support) and uses them to communicate with a Piper server cannam@187: * using the Cap'n Proto serialisation of the Piper API. The transport cannam@187: * layer (and thus the nature of the server) is defined by the cannam@187: * SynchronousTransport passed to the constructor. cannam@187: * cannam@187: * This class implements both the Loader interface (which constructs cannam@187: * PluginStub objects) and the PluginClient (which accepts PluginStubs cannam@187: * and maps them into Piper handles). c@100: */ c@96: class CapnpRRClient : public PluginClient, c@118: public Loader c@96: { c@96: // unsigned to avoid undefined behaviour on possible wrap c@96: typedef uint32_t ReqId; c@96: c@96: class CompletenessChecker : public MessageCompletenessChecker { c@96: public: c@146: State check(const std::vector &message) const override { c@146: c@147: if (message.size() < sizeof(capnp::word)) { c@147: return Incomplete; c@147: } c@147: c@96: auto karr = toKJArray(message); c@96: size_t words = karr.size(); c@96: size_t expected = capnp::expectedSizeInWordsFromPrefix(karr); c@146: c@146: // Lacking a way to definitively check whether a message c@146: // is valid or not, we would still like to trap obvious c@146: // cases where a programming mistake results in garbage c@146: // being returned from the server. We impose a limit on c@146: // message size and, if a prefix is projected to exceed c@146: // that limit, call it invalid. If an extractor wants to c@146: // return a feature set greater than a gigaword in size, c@146: // it'll just have to do it across multiple process calls. c@146: size_t limit = size_t(1) << 30; c@146: c@145: // cerr << "CompletenessChecker: message.size() = " << message.size() c@146: // << ", words = " << words << ", limit = " << limit << ", expected = " << expected << endl; c@146: c@96: if (words > expected) { c@96: std::cerr << "WARNING: obtained more data than expected (" c@96: << words << " " << sizeof(capnp::word) c@96: << "-byte words, expected " c@96: << expected << ")" << std::endl; c@146: return Complete; c@146: } else if (words == expected) { c@146: return Complete; c@146: } else if (expected > limit) { c@147: std::cerr << "WARNING: apparently invalid message prefix: have " c@147: << words << " words in prefix, projected message size is " c@147: << expected << " against limit of " << limit << std::endl; c@146: return Invalid; c@146: } else { c@146: return Incomplete; c@96: } c@96: } c@96: }; c@96: c@96: public: c@134: CapnpRRClient(SynchronousTransport *transport, //!!! ownership? shared ptr? c@134: LogCallback *logger) : // logger may be nullptr for cerr c@134: m_logger(logger), c@96: m_transport(transport), c@96: m_completenessChecker(new CompletenessChecker) { c@96: transport->setCompletenessChecker(m_completenessChecker); c@96: } c@96: c@96: ~CapnpRRClient() { c@96: delete m_completenessChecker; c@96: } c@96: c@96: //!!! obviously, factor out all repetitive guff c@96: c@96: //!!! list and load are supposed to be called by application code, c@96: //!!! but the rest are only supposed to be called by the plugin -- c@96: //!!! sort out the api here c@96: c@96: // Loader methods: c@96: c@97: ListResponse cannam@207: list(const ListRequest &req) override { c@96: c@142: LOG_E("CapnpRRClient::listPluginData called"); c@142: cannam@170: checkServerOK(); cannam@170: c@96: capnp::MallocMessageBuilder message; c@118: piper::RpcRequest::Builder builder = message.initRoot(); c@131: VampnProto::buildRpcRequest_List(builder, req); c@96: ReqId id = getId(); c@96: builder.getId().setNumber(id); c@96: c@134: auto karr = call(message, "list", true); c@96: c@96: capnp::FlatArrayMessageReader responseMessage(karr); c@97: piper::RpcResponse::Reader reader = responseMessage.getRoot(); c@96: c@97: checkResponseType(reader, piper::RpcResponse::Response::Which::LIST, id); c@96: c@97: ListResponse lr; c@96: VampnProto::readListResponse(lr, reader.getResponse().getList()); c@142: c@142: LOG_E("CapnpRRClient::listPluginData returning"); c@142: c@96: return lr; c@96: } c@96: c@97: LoadResponse cannam@207: load(const LoadRequest &req) override { c@96: c@142: LOG_E("CapnpRRClient::loadPlugin called"); c@142: cannam@170: checkServerOK(); cannam@170: c@97: LoadResponse resp; c@96: PluginHandleMapper::Handle handle = serverLoad(req.pluginKey, c@96: req.inputSampleRate, c@96: req.adapterFlags, c@96: resp.staticData, cannam@289: resp.defaultConfiguration, cannam@289: resp.programParameters); c@96: cannam@208: Vamp::Plugin *plugin = new PiperVampPlugin(this, cannam@208: req.pluginKey, cannam@208: req.inputSampleRate, cannam@208: req.adapterFlags, cannam@208: resp.staticData, cannam@288: resp.defaultConfiguration, cannam@288: resp.programParameters); c@96: c@96: m_mapper.addPlugin(handle, plugin); c@96: c@96: resp.plugin = plugin; c@142: c@142: LOG_E("CapnpRRClient::loadPlugin returning"); c@142: c@96: return resp; c@96: } c@96: c@96: // PluginClient methods: c@96: c@96: virtual cannam@185: ConfigurationResponse cannam@208: configure(PiperVampPlugin *plugin, c@97: PluginConfiguration config) override { c@96: c@142: LOG_E("CapnpRRClient::configure called"); cannam@170: cannam@170: checkServerOK(); c@142: c@97: ConfigurationRequest request; c@96: request.plugin = plugin; c@96: request.configuration = config; c@96: c@96: capnp::MallocMessageBuilder message; c@97: piper::RpcRequest::Builder builder = message.initRoot(); c@96: c@96: VampnProto::buildRpcRequest_Configure(builder, request, m_mapper); c@96: ReqId id = getId(); c@96: builder.getId().setNumber(id); c@96: c@134: auto karr = call(message, "configure", true); c@96: c@96: capnp::FlatArrayMessageReader responseMessage(karr); c@97: piper::RpcResponse::Reader reader = responseMessage.getRoot(); c@96: c@97: checkResponseType(reader, piper::RpcResponse::Response::Which::CONFIGURE, id); c@96: c@97: ConfigurationResponse cr; c@96: VampnProto::readConfigurationResponse(cr, c@96: reader.getResponse().getConfigure(), c@96: m_mapper); c@96: c@142: LOG_E("CapnpRRClient::configure returning"); c@142: cannam@185: return cr; c@96: }; c@96: c@96: virtual c@96: Vamp::Plugin::FeatureSet cannam@208: process(PiperVampPlugin *plugin, c@96: std::vector > inputBuffers, c@96: Vamp::RealTime timestamp) override { c@96: c@142: LOG_E("CapnpRRClient::process called"); c@142: cannam@170: checkServerOK(); cannam@170: c@97: ProcessRequest request; c@96: request.plugin = plugin; c@96: request.inputBuffers = inputBuffers; c@96: request.timestamp = timestamp; c@96: c@96: capnp::MallocMessageBuilder message; c@97: piper::RpcRequest::Builder builder = message.initRoot(); c@96: VampnProto::buildRpcRequest_Process(builder, request, m_mapper); c@118: ReqId id = getId(); c@96: builder.getId().setNumber(id); c@96: c@134: auto karr = call(message, "process", false); c@96: c@96: capnp::FlatArrayMessageReader responseMessage(karr); c@97: piper::RpcResponse::Reader reader = responseMessage.getRoot(); c@96: c@97: checkResponseType(reader, piper::RpcResponse::Response::Which::PROCESS, id); c@96: c@97: ProcessResponse pr; c@96: VampnProto::readProcessResponse(pr, c@96: reader.getResponse().getProcess(), c@96: m_mapper); c@96: c@142: LOG_E("CapnpRRClient::process returning"); c@142: c@96: return pr.features; c@96: } c@96: c@96: virtual Vamp::Plugin::FeatureSet cannam@208: finish(PiperVampPlugin *plugin) override { c@96: c@142: LOG_E("CapnpRRClient::finish called"); c@142: cannam@170: checkServerOK(); cannam@170: c@97: FinishRequest request; c@96: request.plugin = plugin; c@96: c@96: capnp::MallocMessageBuilder message; c@97: piper::RpcRequest::Builder builder = message.initRoot(); c@96: c@96: VampnProto::buildRpcRequest_Finish(builder, request, m_mapper); c@96: ReqId id = getId(); c@96: builder.getId().setNumber(id); c@96: c@134: auto karr = call(message, "finish", true); c@96: c@96: capnp::FlatArrayMessageReader responseMessage(karr); c@97: piper::RpcResponse::Reader reader = responseMessage.getRoot(); c@96: c@97: checkResponseType(reader, piper::RpcResponse::Response::Which::FINISH, id); c@96: c@97: FinishResponse pr; c@96: VampnProto::readFinishResponse(pr, c@96: reader.getResponse().getFinish(), c@96: m_mapper); c@96: c@96: m_mapper.removePlugin(m_mapper.pluginToHandle(plugin)); c@96: c@118: // Don't delete the plugin. It's the plugin that is supposed c@118: // to be calling us here c@96: c@142: LOG_E("CapnpRRClient::finish returning"); c@142: c@96: return pr.features; c@96: } c@96: c@96: virtual void cannam@208: reset(PiperVampPlugin *plugin, c@97: PluginConfiguration config) override { c@96: c@96: // Reload the plugin on the server side, and configure it as requested c@134: c@134: log("CapnpRRClient: reset() called, plugin will be closed and reloaded"); c@96: cannam@170: checkServerOK(); cannam@170: c@96: if (m_mapper.havePlugin(plugin)) { c@96: (void)finish(plugin); // server-side unload c@96: } c@96: c@97: PluginStaticData psd; c@97: PluginConfiguration defaultConfig; cannam@289: PluginProgramParameters programParameters; c@96: PluginHandleMapper::Handle handle = c@96: serverLoad(plugin->getPluginKey(), c@96: plugin->getInputSampleRate(), c@96: plugin->getAdapterFlags(), cannam@289: psd, cannam@289: defaultConfig, cannam@289: programParameters); c@96: c@96: m_mapper.addPlugin(handle, plugin); c@96: c@96: (void)configure(plugin, config); c@96: } c@96: c@96: private: c@96: AssignedPluginHandleMapper m_mapper; c@96: ReqId getId() { c@96: //!!! todo: mutex c@96: static ReqId m_nextId = 0; c@96: return m_nextId++; c@96: } c@96: c@96: static c@96: kj::Array c@96: toKJArray(const std::vector &buffer) { c@118: // We could do this whole thing with fewer copies, but let's c@118: // see whether it matters first c@96: size_t wordSize = sizeof(capnp::word); c@118: size_t words = buffer.size() / wordSize; c@118: kj::Array karr(kj::heapArray(words)); cannam@260: memcpy(reinterpret_cast(karr.begin()), cannam@260: buffer.data(), cannam@260: words * wordSize); c@118: return karr; c@96: } c@96: c@96: void cannam@170: checkServerOK() { cannam@170: if (!m_transport->isOK()) { cannam@170: log("Piper server crashed or failed to start (caller should have checked this)"); cannam@170: throw ServerCrashed(); cannam@170: } cannam@170: } cannam@170: cannam@170: /** cannam@170: * Check (i) that the response has the same id as supplied (which cannam@170: * presumably is the corresponding request id) and (ii) that the cannam@170: * response has the expected type. cannam@170: * cannam@170: * Return only if both of these things are the case. cannam@170: * cannam@170: * If the response has the right id but is an error response, cannam@170: * throw a ServiceError exception with the error response's cannam@170: * message in it. cannam@170: * cannam@170: * If the response has the wrong id, or if it has the wrong type cannam@170: * and is not an error response, throw ProtocolError. (i.e. for cannam@170: * cases having errors that are not conveyed through our official cannam@170: * error response.) cannam@170: */ cannam@170: void c@97: checkResponseType(const piper::RpcResponse::Reader &r, c@97: piper::RpcResponse::Response::Which type, c@96: ReqId id) { c@96: c@96: if (ReqId(r.getId().getNumber()) != id) { c@134: std::ostringstream s; c@134: s << "checkResponseType: wrong response id (received " c@134: << r.getId().getNumber() << ", expected " << id << ")"; c@134: log(s.str()); cannam@170: throw ProtocolError("Wrong response id"); cannam@170: } cannam@170: if (r.getResponse().which() != type) { cannam@170: if (r.getResponse().which() == piper::RpcResponse::Response::Which::ERROR) { cannam@170: int code; cannam@170: std::string message; cannam@170: VampnProto::readRpcResponse_Error(code, message, r); cannam@170: std::ostringstream s; cannam@170: s << "checkResponseType: received an error with message: " cannam@170: << message; cannam@170: log(s.str()); cannam@170: throw ServiceError(message); cannam@170: } else { cannam@170: std::ostringstream s; cannam@170: s << "checkResponseType: wrong response type (received " cannam@170: << int(r.getResponse().which()) << ", expected " << int(type) << ")"; cannam@170: log(s.str()); cannam@170: throw ProtocolError("Wrong response type"); cannam@170: } c@96: } c@96: } c@96: c@96: kj::Array c@134: call(capnp::MallocMessageBuilder &message, std::string type, bool slow) { c@96: auto arr = capnp::messageToFlatArray(message); c@96: auto responseBuffer = m_transport->call(arr.asChars().begin(), c@126: arr.asChars().size(), c@134: type, c@126: slow); c@118: return toKJArray(responseBuffer); c@96: } c@96: c@96: PluginHandleMapper::Handle c@96: serverLoad(std::string key, float inputSampleRate, int adapterFlags, c@97: PluginStaticData &psd, cannam@289: PluginConfiguration &defaultConfig, cannam@289: PluginProgramParameters &programParameters) { c@96: c@97: LoadRequest request; c@96: request.pluginKey = key; c@96: request.inputSampleRate = inputSampleRate; c@96: request.adapterFlags = adapterFlags; c@96: c@96: capnp::MallocMessageBuilder message; c@97: piper::RpcRequest::Builder builder = message.initRoot(); c@96: c@96: VampnProto::buildRpcRequest_Load(builder, request); c@96: ReqId id = getId(); c@96: builder.getId().setNumber(id); c@96: c@134: auto karr = call(message, "load", false); c@96: c@96: capnp::FlatArrayMessageReader responseMessage(karr); c@97: piper::RpcResponse::Reader reader = responseMessage.getRoot(); c@96: c@97: checkResponseType(reader, piper::RpcResponse::Response::Which::LOAD, id); c@96: c@97: const piper::LoadResponse::Reader &lr = reader.getResponse().getLoad(); c@96: VampnProto::readExtractorStaticData(psd, lr.getStaticData()); c@96: VampnProto::readConfiguration(defaultConfig, lr.getDefaultConfiguration()); cannam@289: for (auto pp: lr.getProgramParameters()) { cannam@289: VampnProto::readProgramParameterMap(programParameters, pp); cannam@289: } c@96: return lr.getHandle(); c@96: }; c@96: c@96: private: c@134: LogCallback *m_logger; c@96: SynchronousTransport *m_transport; //!!! I don't own this, but should I? c@96: CompletenessChecker *m_completenessChecker; // I own this c@134: c@134: void log(std::string message) const { c@134: if (m_logger) m_logger->log(message); c@134: else std::cerr << message << std::endl; c@134: } c@96: }; c@96: c@96: } c@96: } c@96: c@96: #endif