c@75: c@75: #include "vamp-capnp/VampnProto.h" c@75: #include "vamp-support/RequestOrResponse.h" c@75: #include "vamp-support/CountingPluginHandleMapper.h" c@97: #include "vamp-support/LoaderRequests.h" c@75: c@75: #include c@75: #include c@75: #include c@75: c@91: #include c@91: c@75: #include c@75: #include c@75: c@103: #include // getpid for logging c@103: c@75: using namespace std; c@97: using namespace piper_vamp; c@75: using namespace Vamp; c@75: c@103: static int pid = getpid(); c@103: c@75: void usage() c@75: { c@75: string myname = "piper-vamp-server"; c@75: cerr << "\n" << myname << c@75: ": Load and run Vamp plugins in response to messages from stdin\n\n" c@75: " Usage: " << myname << "\n\n" c@75: "Expects Piper request messages in Cap'n Proto packed format on stdin,\n" c@75: "and writes Piper response messages in the same format to stdout.\n\n"; c@75: c@75: exit(2); c@75: } c@75: c@75: static CountingPluginHandleMapper mapper; c@75: c@97: static RequestOrResponse::RpcId readId(const piper::RpcRequest::Reader &r) c@75: { c@75: int number; c@75: string tag; c@75: switch (r.getId().which()) { c@97: case piper::RpcRequest::Id::Which::NUMBER: c@75: number = r.getId().getNumber(); c@75: return { RequestOrResponse::RpcId::Number, number, "" }; c@97: case piper::RpcRequest::Id::Which::TAG: c@75: tag = r.getId().getTag(); c@75: return { RequestOrResponse::RpcId::Tag, 0, tag }; c@97: case piper::RpcRequest::Id::Which::NONE: c@75: return { RequestOrResponse::RpcId::Absent, 0, "" }; c@75: } c@75: return {}; c@75: } c@75: c@97: static void buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id) c@75: { c@75: switch (id.type) { c@75: case RequestOrResponse::RpcId::Number: c@75: b.getId().setNumber(id.number); c@75: break; c@75: case RequestOrResponse::RpcId::Tag: c@75: b.getId().setTag(id.tag); c@75: break; c@75: case RequestOrResponse::RpcId::Absent: c@75: b.getId().setNone(); c@75: break; c@75: } c@75: } c@75: c@75: RequestOrResponse c@75: readRequestCapnp() c@75: { c@75: RequestOrResponse rr; c@75: rr.direction = RequestOrResponse::Request; c@75: c@75: static kj::FdInputStream stream(0); // stdin c@75: static kj::BufferedInputStreamWrapper buffered(stream); c@75: c@75: if (buffered.tryGetReadBuffer() == nullptr) { c@75: rr.type = RRType::NotValid; c@75: return rr; c@75: } c@75: c@97: capnp::InputStreamMessageReader message(buffered); c@97: piper::RpcRequest::Reader reader = message.getRoot(); c@75: c@75: rr.type = VampnProto::getRequestResponseType(reader); c@75: rr.id = readId(reader); c@75: c@75: switch (rr.type) { c@75: c@75: case RRType::List: c@75: VampnProto::readRpcRequest_List(reader); // type check only c@75: break; c@75: case RRType::Load: c@75: VampnProto::readRpcRequest_Load(rr.loadRequest, reader); c@75: break; c@75: case RRType::Configure: c@75: VampnProto::readRpcRequest_Configure(rr.configurationRequest, c@75: reader, mapper); c@75: break; c@75: case RRType::Process: c@75: VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); c@75: break; c@75: case RRType::Finish: c@75: VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); c@75: break; c@75: case RRType::NotValid: c@75: break; c@75: } c@75: c@75: return rr; c@75: } c@75: c@75: void c@75: writeResponseCapnp(RequestOrResponse &rr) c@75: { c@97: capnp::MallocMessageBuilder message; c@97: piper::RpcResponse::Builder builder = message.initRoot(); c@75: c@75: buildId(builder, rr.id); c@75: c@75: if (!rr.success) { c@75: c@75: VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); c@75: c@75: } else { c@75: c@75: switch (rr.type) { c@75: c@75: case RRType::List: c@75: VampnProto::buildRpcResponse_List(builder, rr.listResponse); c@75: break; c@75: case RRType::Load: c@75: VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); c@75: break; c@75: case RRType::Configure: c@75: VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); c@75: break; c@75: case RRType::Process: c@75: VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); c@75: break; c@75: case RRType::Finish: c@75: VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); c@75: break; c@75: case RRType::NotValid: c@75: break; c@75: } c@75: } c@75: c@75: writeMessageToFd(1, message); c@75: } c@75: c@75: void c@75: writeExceptionCapnp(const std::exception &e, RRType type) c@75: { c@97: capnp::MallocMessageBuilder message; c@97: piper::RpcResponse::Builder builder = message.initRoot(); c@75: VampnProto::buildRpcResponse_Exception(builder, e, type); c@75: c@75: writeMessageToFd(1, message); c@75: } c@75: c@75: RequestOrResponse c@75: handleRequest(const RequestOrResponse &request) c@75: { c@75: RequestOrResponse response; c@75: response.direction = RequestOrResponse::Response; c@75: response.type = request.type; c@75: c@75: switch (request.type) { c@75: c@75: case RRType::List: c@97: response.listResponse = LoaderRequests().listPluginData(); c@75: response.success = true; c@75: break; c@75: c@75: case RRType::Load: c@97: response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest); c@75: if (response.loadResponse.plugin != nullptr) { c@75: mapper.addPlugin(response.loadResponse.plugin); c@103: cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl; c@75: response.success = true; c@75: } c@75: break; c@75: c@75: case RRType::Configure: c@75: { c@75: auto &creq = request.configurationRequest; c@75: auto h = mapper.pluginToHandle(creq.plugin); c@75: if (mapper.isConfigured(h)) { c@75: throw runtime_error("plugin has already been configured"); c@75: } c@75: c@97: response.configurationResponse = LoaderRequests().configurePlugin(creq); c@75: c@75: if (!response.configurationResponse.outputs.empty()) { c@75: mapper.markConfigured c@75: (h, creq.configuration.channelCount, creq.configuration.blockSize); c@75: response.success = true; c@75: } c@75: break; c@75: } c@75: c@75: case RRType::Process: c@75: { c@75: auto &preq = request.processRequest; c@75: auto h = mapper.pluginToHandle(preq.plugin); c@75: if (!mapper.isConfigured(h)) { c@75: throw runtime_error("plugin has not been configured"); c@75: } c@75: c@75: int channels = int(preq.inputBuffers.size()); c@75: if (channels != mapper.getChannelCount(h)) { c@75: throw runtime_error("wrong number of channels supplied to process"); c@75: } c@75: c@75: const float **fbuffers = new const float *[channels]; c@75: for (int i = 0; i < channels; ++i) { c@75: if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { c@75: delete[] fbuffers; c@75: throw runtime_error("wrong block size supplied to process"); c@75: } c@75: fbuffers[i] = preq.inputBuffers[i].data(); c@75: } c@75: c@75: response.processResponse.plugin = preq.plugin; c@75: response.processResponse.features = c@75: preq.plugin->process(fbuffers, preq.timestamp); c@75: response.success = true; c@75: c@75: delete[] fbuffers; c@75: break; c@75: } c@75: c@75: case RRType::Finish: c@75: { c@77: auto &freq = request.finishRequest; c@77: response.finishResponse.plugin = freq.plugin; c@77: c@77: auto h = mapper.pluginToHandle(freq.plugin); c@77: // Finish can be called (to unload the plugin) even if the c@77: // plugin has never been configured or used. But we want to c@77: // make sure we call getRemainingFeatures only if we have c@77: // actually configured the plugin. c@77: if (mapper.isConfigured(h)) { c@77: response.finishResponse.features = freq.plugin->getRemainingFeatures(); c@77: } c@75: c@75: // We do not delete the plugin here -- we need it in the c@77: // mapper when converting the features. It gets deleted in the c@77: // calling function. c@75: response.success = true; c@75: break; c@75: } c@75: c@75: case RRType::NotValid: c@75: break; c@75: } c@75: c@75: return response; c@75: } c@75: c@103: int main(int argc, char **) c@75: { c@75: if (argc != 1) { c@75: usage(); c@75: } c@75: c@75: while (true) { c@75: c@75: RequestOrResponse request; c@75: c@75: try { c@75: c@75: request = readRequestCapnp(); c@75: c@103: cerr << "piper-vamp-server " << pid << ": request received, of type " c@75: << int(request.type) c@75: << endl; c@75: c@75: // NotValid without an exception indicates EOF: c@75: if (request.type == RRType::NotValid) { c@103: cerr << "piper-vamp-server " << pid << ": eof reached, exiting" << endl; c@75: break; c@75: } c@75: c@75: RequestOrResponse response = handleRequest(request); c@75: response.id = request.id; c@75: c@103: cerr << "piper-vamp-server " << pid << ": request handled, writing response" c@75: << endl; c@75: c@75: writeResponseCapnp(response); c@75: c@103: cerr << "piper-vamp-server " << pid << ": response written" << endl; c@75: c@75: if (request.type == RRType::Finish) { c@75: auto h = mapper.pluginToHandle(request.finishRequest.plugin); c@103: cerr << "piper-vamp-server " << pid << ": deleting the plugin with handle " << h << endl; c@75: mapper.removePlugin(h); c@75: delete request.finishRequest.plugin; c@75: } c@75: c@75: } catch (std::exception &e) { c@75: c@103: cerr << "piper-vamp-server " << pid << ": error: " << e.what() << endl; c@75: c@75: writeExceptionCapnp(e, request.type); c@75: c@75: exit(1); c@75: } c@75: } c@75: c@75: exit(0); c@75: }