c@75: c@75: #include "vamp-capnp/VampnProto.h" c@75: #include "vamp-support/RequestOrResponse.h" c@75: #include "vamp-support/CountingPluginHandleMapper.h" c@75: c@75: #include c@75: #include c@75: #include c@75: c@75: #include c@75: #include c@75: c@75: using namespace std; c@75: using namespace piper; c@75: using namespace Vamp; c@75: using namespace Vamp::HostExt; c@75: c@75: void usage() c@75: { c@75: string myname = "piper-vamp-server"; c@75: cerr << "\n" << myname << c@75: ": Load and run Vamp plugins in response to messages from stdin\n\n" c@75: " Usage: " << myname << "\n\n" c@75: "Expects Piper request messages in Cap'n Proto packed format on stdin,\n" c@75: "and writes Piper response messages in the same format to stdout.\n\n"; c@75: c@75: exit(2); c@75: } c@75: c@75: static CountingPluginHandleMapper mapper; c@75: c@75: static RequestOrResponse::RpcId readId(const RpcRequest::Reader &r) c@75: { c@75: int number; c@75: string tag; c@75: switch (r.getId().which()) { c@75: case RpcRequest::Id::Which::NUMBER: c@75: number = r.getId().getNumber(); c@75: return { RequestOrResponse::RpcId::Number, number, "" }; c@75: case RpcRequest::Id::Which::TAG: c@75: tag = r.getId().getTag(); c@75: return { RequestOrResponse::RpcId::Tag, 0, tag }; c@75: case RpcRequest::Id::Which::NONE: c@75: return { RequestOrResponse::RpcId::Absent, 0, "" }; c@75: } c@75: return {}; c@75: } c@75: c@75: static void buildId(RpcResponse::Builder &b, const RequestOrResponse::RpcId &id) c@75: { c@75: switch (id.type) { c@75: case RequestOrResponse::RpcId::Number: c@75: b.getId().setNumber(id.number); c@75: break; c@75: case RequestOrResponse::RpcId::Tag: c@75: b.getId().setTag(id.tag); c@75: break; c@75: case RequestOrResponse::RpcId::Absent: c@75: b.getId().setNone(); c@75: break; c@75: } c@75: } c@75: c@75: RequestOrResponse c@75: readRequestCapnp() c@75: { c@75: RequestOrResponse rr; c@75: rr.direction = RequestOrResponse::Request; c@75: c@75: static kj::FdInputStream stream(0); // stdin c@75: static kj::BufferedInputStreamWrapper buffered(stream); c@75: c@75: if (buffered.tryGetReadBuffer() == nullptr) { c@75: rr.type = RRType::NotValid; c@75: return rr; c@75: } c@75: c@75: ::capnp::InputStreamMessageReader message(buffered); c@75: RpcRequest::Reader reader = message.getRoot(); c@75: c@75: rr.type = VampnProto::getRequestResponseType(reader); c@75: rr.id = readId(reader); c@75: c@75: switch (rr.type) { c@75: c@75: case RRType::List: c@75: VampnProto::readRpcRequest_List(reader); // type check only c@75: break; c@75: case RRType::Load: c@75: VampnProto::readRpcRequest_Load(rr.loadRequest, reader); c@75: break; c@75: case RRType::Configure: c@75: VampnProto::readRpcRequest_Configure(rr.configurationRequest, c@75: reader, mapper); c@75: break; c@75: case RRType::Process: c@75: VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); c@75: break; c@75: case RRType::Finish: c@75: VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); c@75: break; c@75: case RRType::NotValid: c@75: break; c@75: } c@75: c@75: return rr; c@75: } c@75: c@75: void c@75: writeResponseCapnp(RequestOrResponse &rr) c@75: { c@75: ::capnp::MallocMessageBuilder message; c@75: RpcResponse::Builder builder = message.initRoot(); c@75: c@75: buildId(builder, rr.id); c@75: c@75: if (!rr.success) { c@75: c@75: VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); c@75: c@75: } else { c@75: c@75: switch (rr.type) { c@75: c@75: case RRType::List: c@75: VampnProto::buildRpcResponse_List(builder, rr.listResponse); c@75: break; c@75: case RRType::Load: c@75: VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); c@75: break; c@75: case RRType::Configure: c@75: VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); c@75: break; c@75: case RRType::Process: c@75: VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); c@75: break; c@75: case RRType::Finish: c@75: VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); c@75: break; c@75: case RRType::NotValid: c@75: break; c@75: } c@75: } c@75: c@75: writeMessageToFd(1, message); c@75: } c@75: c@75: void c@75: writeExceptionCapnp(const std::exception &e, RRType type) c@75: { c@75: ::capnp::MallocMessageBuilder message; c@75: RpcResponse::Builder builder = message.initRoot(); c@75: VampnProto::buildRpcResponse_Exception(builder, e, type); c@75: c@75: writeMessageToFd(1, message); c@75: } c@75: c@75: RequestOrResponse c@75: handleRequest(const RequestOrResponse &request) c@75: { c@75: RequestOrResponse response; c@75: response.direction = RequestOrResponse::Response; c@75: response.type = request.type; c@75: c@75: auto loader = PluginLoader::getInstance(); c@75: c@75: switch (request.type) { c@75: c@75: case RRType::List: c@75: response.listResponse = loader->listPluginData(); c@75: response.success = true; c@75: break; c@75: c@75: case RRType::Load: c@75: response.loadResponse = loader->loadPlugin(request.loadRequest); c@75: if (response.loadResponse.plugin != nullptr) { c@75: mapper.addPlugin(response.loadResponse.plugin); c@75: response.success = true; c@75: } c@75: break; c@75: c@75: case RRType::Configure: c@75: { c@75: auto &creq = request.configurationRequest; c@75: auto h = mapper.pluginToHandle(creq.plugin); c@75: if (mapper.isConfigured(h)) { c@75: throw runtime_error("plugin has already been configured"); c@75: } c@75: c@75: response.configurationResponse = loader->configurePlugin(creq); c@75: c@75: if (!response.configurationResponse.outputs.empty()) { c@75: mapper.markConfigured c@75: (h, creq.configuration.channelCount, creq.configuration.blockSize); c@75: response.success = true; c@75: } c@75: break; c@75: } c@75: c@75: case RRType::Process: c@75: { c@75: auto &preq = request.processRequest; c@75: auto h = mapper.pluginToHandle(preq.plugin); c@75: if (!mapper.isConfigured(h)) { c@75: throw runtime_error("plugin has not been configured"); c@75: } c@75: c@75: int channels = int(preq.inputBuffers.size()); c@75: if (channels != mapper.getChannelCount(h)) { c@75: throw runtime_error("wrong number of channels supplied to process"); c@75: } c@75: c@75: const float **fbuffers = new const float *[channels]; c@75: for (int i = 0; i < channels; ++i) { c@75: if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { c@75: delete[] fbuffers; c@75: throw runtime_error("wrong block size supplied to process"); c@75: } c@75: fbuffers[i] = preq.inputBuffers[i].data(); c@75: } c@75: c@75: response.processResponse.plugin = preq.plugin; c@75: response.processResponse.features = c@75: preq.plugin->process(fbuffers, preq.timestamp); c@75: response.success = true; c@75: c@75: delete[] fbuffers; c@75: break; c@75: } c@75: c@75: case RRType::Finish: c@75: { c@75: response.finishResponse.plugin = request.finishRequest.plugin; c@75: response.finishResponse.features = c@75: request.finishRequest.plugin->getRemainingFeatures(); c@75: c@75: // We do not delete the plugin here -- we need it in the c@75: // mapper when converting the features. It gets deleted by the c@75: // caller. c@75: c@75: response.success = true; c@75: break; c@75: } c@75: c@75: case RRType::NotValid: c@75: break; c@75: } c@75: c@75: return response; c@75: } c@75: c@75: int main(int argc, char **argv) c@75: { c@75: if (argc != 1) { c@75: usage(); c@75: } c@75: c@75: while (true) { c@75: c@75: RequestOrResponse request; c@75: c@75: try { c@75: c@75: request = readRequestCapnp(); c@75: c@75: cerr << "piper-vamp-server: request received, of type " c@75: << int(request.type) c@75: << endl; c@75: c@75: // NotValid without an exception indicates EOF: c@75: if (request.type == RRType::NotValid) { c@75: cerr << "piper-vamp-server: eof reached" << endl; c@75: break; c@75: } c@75: c@75: RequestOrResponse response = handleRequest(request); c@75: response.id = request.id; c@75: c@75: cerr << "piper-vamp-server: request handled, writing response" c@75: << endl; c@75: c@75: writeResponseCapnp(response); c@75: c@75: cerr << "piper-vamp-server: response written" << endl; c@75: c@75: if (request.type == RRType::Finish) { c@75: auto h = mapper.pluginToHandle(request.finishRequest.plugin); c@75: mapper.removePlugin(h); c@75: delete request.finishRequest.plugin; c@75: } c@75: c@75: } catch (std::exception &e) { c@75: c@75: cerr << "piper-vamp-server: error: " << e.what() << endl; c@75: c@75: writeExceptionCapnp(e, request.type); c@75: c@75: exit(1); c@75: } c@75: } c@75: c@75: exit(0); c@75: }