c@31: c@31: #include "VampnProto.h" c@31: c@31: #include "bits/RequestOrResponse.h" c@40: #include "bits/CountingPluginHandleMapper.h" c@31: c@31: #include c@31: #include c@31: #include c@31: c@32: #include c@32: #include c@32: c@31: using namespace std; c@31: using namespace vampipe; c@32: using namespace Vamp; c@32: using namespace Vamp::HostExt; c@31: c@31: void usage() c@31: { c@31: string myname = "vampipe-server"; c@31: cerr << "\n" << myname << c@31: ": Load and run Vamp plugins in response to messages from stdin\n\n" c@31: " Usage: " << myname << "\n\n" c@31: "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n" c@31: "and writes Vamp response messages in the same format to stdout.\n\n"; c@31: c@31: exit(2); c@31: } c@31: c@40: static CountingPluginHandleMapper mapper; c@32: c@31: RequestOrResponse c@31: readRequestCapnp() c@31: { c@31: RequestOrResponse rr; c@31: rr.direction = RequestOrResponse::Request; c@31: c@33: static kj::FdInputStream stream(0); // stdin c@33: static kj::BufferedInputStreamWrapper buffered(stream); c@33: c@33: if (buffered.tryGetReadBuffer() == nullptr) { c@33: rr.type = RRType::NotValid; c@33: return rr; c@33: } c@33: c@33: ::capnp::InputStreamMessageReader message(buffered); c@68: RpcRequest::Reader reader = message.getRoot(); c@31: c@31: rr.type = VampnProto::getRequestResponseType(reader); c@31: c@31: switch (rr.type) { c@31: c@31: case RRType::List: c@68: VampnProto::readRpcRequest_List(reader); // type check only c@31: break; c@31: case RRType::Load: c@68: VampnProto::readRpcRequest_Load(rr.loadRequest, reader); c@31: break; c@31: case RRType::Configure: c@68: VampnProto::readRpcRequest_Configure(rr.configurationRequest, c@32: reader, mapper); c@31: break; c@31: case RRType::Process: c@68: VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); c@31: break; c@31: case RRType::Finish: c@68: VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); c@31: break; c@31: case RRType::NotValid: c@31: break; c@31: } c@31: c@31: return rr; c@31: } c@31: c@31: void c@31: writeResponseCapnp(RequestOrResponse &rr) c@31: { c@31: ::capnp::MallocMessageBuilder message; c@68: RpcResponse::Builder builder = message.initRoot(); c@31: c@52: if (!rr.success) { c@31: c@68: VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); c@52: c@52: } else { c@52: c@52: switch (rr.type) { c@52: c@52: case RRType::List: c@68: VampnProto::buildRpcResponse_List(builder, rr.listResponse); c@52: break; c@52: case RRType::Load: c@68: VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); c@52: break; c@52: case RRType::Configure: c@68: VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); c@52: break; c@52: case RRType::Process: c@68: VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); c@52: break; c@52: case RRType::Finish: c@68: VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); c@52: break; c@52: case RRType::NotValid: c@52: break; c@52: } c@31: } c@52: c@52: writeMessageToFd(1, message); c@52: } c@31: c@52: void c@52: writeExceptionCapnp(const std::exception &e, RRType type) c@52: { c@52: ::capnp::MallocMessageBuilder message; c@68: RpcResponse::Builder builder = message.initRoot(); c@68: VampnProto::buildRpcResponse_Exception(builder, e, type); c@52: c@33: writeMessageToFd(1, message); c@31: } c@31: c@31: RequestOrResponse c@34: handleRequest(const RequestOrResponse &request) c@31: { c@31: RequestOrResponse response; c@31: response.direction = RequestOrResponse::Response; c@32: response.type = request.type; c@32: c@32: auto loader = PluginLoader::getInstance(); c@32: c@32: switch (request.type) { c@32: c@32: case RRType::List: c@32: response.listResponse = loader->listPluginData(); c@32: response.success = true; c@32: break; c@32: c@32: case RRType::Load: c@32: response.loadResponse = loader->loadPlugin(request.loadRequest); c@32: if (response.loadResponse.plugin != nullptr) { c@32: mapper.addPlugin(response.loadResponse.plugin); c@32: response.success = true; c@32: } c@32: break; c@32: c@33: case RRType::Configure: c@33: { c@34: auto &creq = request.configurationRequest; c@34: auto h = mapper.pluginToHandle(creq.plugin); c@33: if (mapper.isConfigured(h)) { c@33: throw runtime_error("plugin has already been configured"); c@33: } c@33: c@34: response.configurationResponse = loader->configurePlugin(creq); c@33: c@33: if (!response.configurationResponse.outputs.empty()) { c@34: mapper.markConfigured c@34: (h, creq.configuration.channelCount, creq.configuration.blockSize); c@33: response.success = true; c@33: } c@33: break; c@33: } c@33: c@33: case RRType::Process: c@33: { c@33: auto &preq = request.processRequest; c@34: auto h = mapper.pluginToHandle(preq.plugin); c@34: if (!mapper.isConfigured(h)) { c@34: throw runtime_error("plugin has not been configured"); c@34: } c@34: c@33: int channels = int(preq.inputBuffers.size()); c@34: if (channels != mapper.getChannelCount(h)) { c@34: throw runtime_error("wrong number of channels supplied to process"); c@34: } c@34: c@33: const float **fbuffers = new const float *[channels]; c@33: for (int i = 0; i < channels; ++i) { c@34: if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { c@34: delete[] fbuffers; c@34: throw runtime_error("wrong block size supplied to process"); c@34: } c@33: fbuffers[i] = preq.inputBuffers[i].data(); c@33: } c@33: c@52: response.processResponse.plugin = preq.plugin; c@33: response.processResponse.features = c@33: preq.plugin->process(fbuffers, preq.timestamp); c@33: response.success = true; c@33: c@33: delete[] fbuffers; c@33: break; c@33: } c@33: c@33: case RRType::Finish: c@33: { c@55: response.finishResponse.plugin = request.finishRequest.plugin; c@33: response.finishResponse.features = c@55: request.finishRequest.plugin->getRemainingFeatures(); c@52: c@52: // We do not delete the plugin here -- we need it in the c@52: // mapper when converting the features. It gets deleted by the c@52: // caller. c@52: c@33: response.success = true; c@33: break; c@33: } c@33: c@33: case RRType::NotValid: c@33: break; c@32: } c@32: c@31: return response; c@31: } c@31: c@31: int main(int argc, char **argv) c@31: { c@31: if (argc != 1) { c@31: usage(); c@31: } c@31: c@31: while (true) { c@31: c@52: RequestOrResponse request; c@52: c@31: try { c@31: c@52: request = readRequestCapnp(); c@31: c@33: cerr << "vampipe-server: request received, of type " c@33: << int(request.type) c@33: << endl; c@33: c@31: // NotValid without an exception indicates EOF: c@33: if (request.type == RRType::NotValid) { c@33: cerr << "vampipe-server: eof reached" << endl; c@33: break; c@33: } c@31: c@34: RequestOrResponse response = handleRequest(request); c@33: c@34: cerr << "vampipe-server: request handled, writing response" c@33: << endl; c@31: c@31: writeResponseCapnp(response); c@33: c@33: cerr << "vampipe-server: response written" << endl; c@52: c@52: if (request.type == RRType::Finish) { c@55: auto h = mapper.pluginToHandle(request.finishRequest.plugin); c@52: mapper.removePlugin(h); c@55: delete request.finishRequest.plugin; c@52: } c@31: c@31: } catch (std::exception &e) { c@52: c@33: cerr << "vampipe-server: error: " << e.what() << endl; c@52: c@52: writeExceptionCapnp(e, request.type); c@52: c@31: exit(1); c@31: } c@31: } c@31: c@31: exit(0); c@31: }