annotate utilities/vampipe-server.cpp @ 71:6c908ee3ad3b

vamp -> piper for IDL specs
author Chris Cannam <c.cannam@qmul.ac.uk>
date Fri, 07 Oct 2016 14:27:11 +0100
parents a5ba837bca28
children 7bfc07576830
rev   line source
c@31 1
c@31 2 #include "VampnProto.h"
c@31 3
c@31 4 #include "bits/RequestOrResponse.h"
c@40 5 #include "bits/CountingPluginHandleMapper.h"
c@31 6
c@31 7 #include <iostream>
c@31 8 #include <sstream>
c@31 9 #include <stdexcept>
c@31 10
c@32 11 #include <map>
c@32 12 #include <set>
c@32 13
c@31 14 using namespace std;
c@31 15 using namespace vampipe;
c@32 16 using namespace Vamp;
c@32 17 using namespace Vamp::HostExt;
c@31 18
c@31 19 void usage()
c@31 20 {
c@31 21 string myname = "vampipe-server";
c@31 22 cerr << "\n" << myname <<
c@31 23 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@31 24 " Usage: " << myname << "\n\n"
c@31 25 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
c@31 26 "and writes Vamp response messages in the same format to stdout.\n\n";
c@31 27
c@31 28 exit(2);
c@31 29 }
c@31 30
c@40 31 static CountingPluginHandleMapper mapper;
c@32 32
c@31 33 RequestOrResponse
c@31 34 readRequestCapnp()
c@31 35 {
c@31 36 RequestOrResponse rr;
c@31 37 rr.direction = RequestOrResponse::Request;
c@31 38
c@33 39 static kj::FdInputStream stream(0); // stdin
c@33 40 static kj::BufferedInputStreamWrapper buffered(stream);
c@33 41
c@33 42 if (buffered.tryGetReadBuffer() == nullptr) {
c@33 43 rr.type = RRType::NotValid;
c@33 44 return rr;
c@33 45 }
c@33 46
c@33 47 ::capnp::InputStreamMessageReader message(buffered);
c@68 48 RpcRequest::Reader reader = message.getRoot<RpcRequest>();
c@31 49
c@31 50 rr.type = VampnProto::getRequestResponseType(reader);
c@31 51
c@31 52 switch (rr.type) {
c@31 53
c@31 54 case RRType::List:
c@68 55 VampnProto::readRpcRequest_List(reader); // type check only
c@31 56 break;
c@31 57 case RRType::Load:
c@68 58 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
c@31 59 break;
c@31 60 case RRType::Configure:
c@68 61 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
c@32 62 reader, mapper);
c@31 63 break;
c@31 64 case RRType::Process:
c@68 65 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
c@31 66 break;
c@31 67 case RRType::Finish:
c@68 68 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
c@31 69 break;
c@31 70 case RRType::NotValid:
c@31 71 break;
c@31 72 }
c@31 73
c@31 74 return rr;
c@31 75 }
c@31 76
c@31 77 void
c@31 78 writeResponseCapnp(RequestOrResponse &rr)
c@31 79 {
c@31 80 ::capnp::MallocMessageBuilder message;
c@68 81 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
c@31 82
c@52 83 if (!rr.success) {
c@31 84
c@68 85 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
c@52 86
c@52 87 } else {
c@52 88
c@52 89 switch (rr.type) {
c@52 90
c@52 91 case RRType::List:
c@68 92 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
c@52 93 break;
c@52 94 case RRType::Load:
c@68 95 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
c@52 96 break;
c@52 97 case RRType::Configure:
c@68 98 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
c@52 99 break;
c@52 100 case RRType::Process:
c@68 101 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
c@52 102 break;
c@52 103 case RRType::Finish:
c@68 104 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
c@52 105 break;
c@52 106 case RRType::NotValid:
c@52 107 break;
c@52 108 }
c@31 109 }
c@52 110
c@52 111 writeMessageToFd(1, message);
c@52 112 }
c@31 113
c@52 114 void
c@52 115 writeExceptionCapnp(const std::exception &e, RRType type)
c@52 116 {
c@52 117 ::capnp::MallocMessageBuilder message;
c@68 118 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
c@68 119 VampnProto::buildRpcResponse_Exception(builder, e, type);
c@52 120
c@33 121 writeMessageToFd(1, message);
c@31 122 }
c@31 123
c@31 124 RequestOrResponse
c@34 125 handleRequest(const RequestOrResponse &request)
c@31 126 {
c@31 127 RequestOrResponse response;
c@31 128 response.direction = RequestOrResponse::Response;
c@32 129 response.type = request.type;
c@32 130
c@32 131 auto loader = PluginLoader::getInstance();
c@32 132
c@32 133 switch (request.type) {
c@32 134
c@32 135 case RRType::List:
c@32 136 response.listResponse = loader->listPluginData();
c@32 137 response.success = true;
c@32 138 break;
c@32 139
c@32 140 case RRType::Load:
c@32 141 response.loadResponse = loader->loadPlugin(request.loadRequest);
c@32 142 if (response.loadResponse.plugin != nullptr) {
c@32 143 mapper.addPlugin(response.loadResponse.plugin);
c@32 144 response.success = true;
c@32 145 }
c@32 146 break;
c@32 147
c@33 148 case RRType::Configure:
c@33 149 {
c@34 150 auto &creq = request.configurationRequest;
c@34 151 auto h = mapper.pluginToHandle(creq.plugin);
c@33 152 if (mapper.isConfigured(h)) {
c@33 153 throw runtime_error("plugin has already been configured");
c@33 154 }
c@33 155
c@34 156 response.configurationResponse = loader->configurePlugin(creq);
c@33 157
c@33 158 if (!response.configurationResponse.outputs.empty()) {
c@34 159 mapper.markConfigured
c@34 160 (h, creq.configuration.channelCount, creq.configuration.blockSize);
c@33 161 response.success = true;
c@33 162 }
c@33 163 break;
c@33 164 }
c@33 165
c@33 166 case RRType::Process:
c@33 167 {
c@33 168 auto &preq = request.processRequest;
c@34 169 auto h = mapper.pluginToHandle(preq.plugin);
c@34 170 if (!mapper.isConfigured(h)) {
c@34 171 throw runtime_error("plugin has not been configured");
c@34 172 }
c@34 173
c@33 174 int channels = int(preq.inputBuffers.size());
c@34 175 if (channels != mapper.getChannelCount(h)) {
c@34 176 throw runtime_error("wrong number of channels supplied to process");
c@34 177 }
c@34 178
c@33 179 const float **fbuffers = new const float *[channels];
c@33 180 for (int i = 0; i < channels; ++i) {
c@34 181 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
c@34 182 delete[] fbuffers;
c@34 183 throw runtime_error("wrong block size supplied to process");
c@34 184 }
c@33 185 fbuffers[i] = preq.inputBuffers[i].data();
c@33 186 }
c@33 187
c@52 188 response.processResponse.plugin = preq.plugin;
c@33 189 response.processResponse.features =
c@33 190 preq.plugin->process(fbuffers, preq.timestamp);
c@33 191 response.success = true;
c@33 192
c@33 193 delete[] fbuffers;
c@33 194 break;
c@33 195 }
c@33 196
c@33 197 case RRType::Finish:
c@33 198 {
c@55 199 response.finishResponse.plugin = request.finishRequest.plugin;
c@33 200 response.finishResponse.features =
c@55 201 request.finishRequest.plugin->getRemainingFeatures();
c@52 202
c@52 203 // We do not delete the plugin here -- we need it in the
c@52 204 // mapper when converting the features. It gets deleted by the
c@52 205 // caller.
c@52 206
c@33 207 response.success = true;
c@33 208 break;
c@33 209 }
c@33 210
c@33 211 case RRType::NotValid:
c@33 212 break;
c@32 213 }
c@32 214
c@31 215 return response;
c@31 216 }
c@31 217
c@31 218 int main(int argc, char **argv)
c@31 219 {
c@31 220 if (argc != 1) {
c@31 221 usage();
c@31 222 }
c@31 223
c@31 224 while (true) {
c@31 225
c@52 226 RequestOrResponse request;
c@52 227
c@31 228 try {
c@31 229
c@52 230 request = readRequestCapnp();
c@31 231
c@33 232 cerr << "vampipe-server: request received, of type "
c@33 233 << int(request.type)
c@33 234 << endl;
c@33 235
c@31 236 // NotValid without an exception indicates EOF:
c@33 237 if (request.type == RRType::NotValid) {
c@33 238 cerr << "vampipe-server: eof reached" << endl;
c@33 239 break;
c@33 240 }
c@31 241
c@34 242 RequestOrResponse response = handleRequest(request);
c@33 243
c@34 244 cerr << "vampipe-server: request handled, writing response"
c@33 245 << endl;
c@31 246
c@31 247 writeResponseCapnp(response);
c@33 248
c@33 249 cerr << "vampipe-server: response written" << endl;
c@52 250
c@52 251 if (request.type == RRType::Finish) {
c@55 252 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
c@52 253 mapper.removePlugin(h);
c@55 254 delete request.finishRequest.plugin;
c@52 255 }
c@31 256
c@31 257 } catch (std::exception &e) {
c@52 258
c@33 259 cerr << "vampipe-server: error: " << e.what() << endl;
c@52 260
c@52 261 writeExceptionCapnp(e, request.type);
c@52 262
c@31 263 exit(1);
c@31 264 }
c@31 265 }
c@31 266
c@31 267 exit(0);
c@31 268 }