annotate vamp-server/server.cpp @ 77:ac1e4634479b

Ensure finish() can be called even if not configured (otherwise there's no way to dispose of the plugin)
author Chris Cannam <c.cannam@qmul.ac.uk>
date Tue, 11 Oct 2016 14:36:43 +0100
parents 81e1c48e97f9
children 03ed2e0a6c8f
rev   line source
c@75 1
c@75 2 #include "vamp-capnp/VampnProto.h"
c@75 3 #include "vamp-support/RequestOrResponse.h"
c@75 4 #include "vamp-support/CountingPluginHandleMapper.h"
c@75 5
c@75 6 #include <iostream>
c@75 7 #include <sstream>
c@75 8 #include <stdexcept>
c@75 9
c@75 10 #include <map>
c@75 11 #include <set>
c@75 12
c@75 13 using namespace std;
c@75 14 using namespace piper;
c@75 15 using namespace Vamp;
c@75 16 using namespace Vamp::HostExt;
c@75 17
c@75 18 void usage()
c@75 19 {
c@75 20 string myname = "piper-vamp-server";
c@75 21 cerr << "\n" << myname <<
c@75 22 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@75 23 " Usage: " << myname << "\n\n"
c@75 24 "Expects Piper request messages in Cap'n Proto packed format on stdin,\n"
c@75 25 "and writes Piper response messages in the same format to stdout.\n\n";
c@75 26
c@75 27 exit(2);
c@75 28 }
c@75 29
c@75 30 static CountingPluginHandleMapper mapper;
c@75 31
c@75 32 static RequestOrResponse::RpcId readId(const RpcRequest::Reader &r)
c@75 33 {
c@75 34 int number;
c@75 35 string tag;
c@75 36 switch (r.getId().which()) {
c@75 37 case RpcRequest::Id::Which::NUMBER:
c@75 38 number = r.getId().getNumber();
c@75 39 return { RequestOrResponse::RpcId::Number, number, "" };
c@75 40 case RpcRequest::Id::Which::TAG:
c@75 41 tag = r.getId().getTag();
c@75 42 return { RequestOrResponse::RpcId::Tag, 0, tag };
c@75 43 case RpcRequest::Id::Which::NONE:
c@75 44 return { RequestOrResponse::RpcId::Absent, 0, "" };
c@75 45 }
c@75 46 return {};
c@75 47 }
c@75 48
c@75 49 static void buildId(RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
c@75 50 {
c@75 51 switch (id.type) {
c@75 52 case RequestOrResponse::RpcId::Number:
c@75 53 b.getId().setNumber(id.number);
c@75 54 break;
c@75 55 case RequestOrResponse::RpcId::Tag:
c@75 56 b.getId().setTag(id.tag);
c@75 57 break;
c@75 58 case RequestOrResponse::RpcId::Absent:
c@75 59 b.getId().setNone();
c@75 60 break;
c@75 61 }
c@75 62 }
c@75 63
c@75 64 RequestOrResponse
c@75 65 readRequestCapnp()
c@75 66 {
c@75 67 RequestOrResponse rr;
c@75 68 rr.direction = RequestOrResponse::Request;
c@75 69
c@75 70 static kj::FdInputStream stream(0); // stdin
c@75 71 static kj::BufferedInputStreamWrapper buffered(stream);
c@75 72
c@75 73 if (buffered.tryGetReadBuffer() == nullptr) {
c@75 74 rr.type = RRType::NotValid;
c@75 75 return rr;
c@75 76 }
c@75 77
c@75 78 ::capnp::InputStreamMessageReader message(buffered);
c@75 79 RpcRequest::Reader reader = message.getRoot<RpcRequest>();
c@75 80
c@75 81 rr.type = VampnProto::getRequestResponseType(reader);
c@75 82 rr.id = readId(reader);
c@75 83
c@75 84 switch (rr.type) {
c@75 85
c@75 86 case RRType::List:
c@75 87 VampnProto::readRpcRequest_List(reader); // type check only
c@75 88 break;
c@75 89 case RRType::Load:
c@75 90 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
c@75 91 break;
c@75 92 case RRType::Configure:
c@75 93 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
c@75 94 reader, mapper);
c@75 95 break;
c@75 96 case RRType::Process:
c@75 97 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
c@75 98 break;
c@75 99 case RRType::Finish:
c@75 100 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
c@75 101 break;
c@75 102 case RRType::NotValid:
c@75 103 break;
c@75 104 }
c@75 105
c@75 106 return rr;
c@75 107 }
c@75 108
c@75 109 void
c@75 110 writeResponseCapnp(RequestOrResponse &rr)
c@75 111 {
c@75 112 ::capnp::MallocMessageBuilder message;
c@75 113 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
c@75 114
c@75 115 buildId(builder, rr.id);
c@75 116
c@75 117 if (!rr.success) {
c@75 118
c@75 119 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
c@75 120
c@75 121 } else {
c@75 122
c@75 123 switch (rr.type) {
c@75 124
c@75 125 case RRType::List:
c@75 126 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
c@75 127 break;
c@75 128 case RRType::Load:
c@75 129 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
c@75 130 break;
c@75 131 case RRType::Configure:
c@75 132 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
c@75 133 break;
c@75 134 case RRType::Process:
c@75 135 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
c@75 136 break;
c@75 137 case RRType::Finish:
c@75 138 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
c@75 139 break;
c@75 140 case RRType::NotValid:
c@75 141 break;
c@75 142 }
c@75 143 }
c@75 144
c@75 145 writeMessageToFd(1, message);
c@75 146 }
c@75 147
c@75 148 void
c@75 149 writeExceptionCapnp(const std::exception &e, RRType type)
c@75 150 {
c@75 151 ::capnp::MallocMessageBuilder message;
c@75 152 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
c@75 153 VampnProto::buildRpcResponse_Exception(builder, e, type);
c@75 154
c@75 155 writeMessageToFd(1, message);
c@75 156 }
c@75 157
c@75 158 RequestOrResponse
c@75 159 handleRequest(const RequestOrResponse &request)
c@75 160 {
c@75 161 RequestOrResponse response;
c@75 162 response.direction = RequestOrResponse::Response;
c@75 163 response.type = request.type;
c@75 164
c@75 165 auto loader = PluginLoader::getInstance();
c@75 166
c@75 167 switch (request.type) {
c@75 168
c@75 169 case RRType::List:
c@75 170 response.listResponse = loader->listPluginData();
c@75 171 response.success = true;
c@75 172 break;
c@75 173
c@75 174 case RRType::Load:
c@75 175 response.loadResponse = loader->loadPlugin(request.loadRequest);
c@75 176 if (response.loadResponse.plugin != nullptr) {
c@75 177 mapper.addPlugin(response.loadResponse.plugin);
c@75 178 response.success = true;
c@75 179 }
c@75 180 break;
c@75 181
c@75 182 case RRType::Configure:
c@75 183 {
c@75 184 auto &creq = request.configurationRequest;
c@75 185 auto h = mapper.pluginToHandle(creq.plugin);
c@75 186 if (mapper.isConfigured(h)) {
c@75 187 throw runtime_error("plugin has already been configured");
c@75 188 }
c@75 189
c@75 190 response.configurationResponse = loader->configurePlugin(creq);
c@75 191
c@75 192 if (!response.configurationResponse.outputs.empty()) {
c@75 193 mapper.markConfigured
c@75 194 (h, creq.configuration.channelCount, creq.configuration.blockSize);
c@75 195 response.success = true;
c@75 196 }
c@75 197 break;
c@75 198 }
c@75 199
c@75 200 case RRType::Process:
c@75 201 {
c@75 202 auto &preq = request.processRequest;
c@75 203 auto h = mapper.pluginToHandle(preq.plugin);
c@75 204 if (!mapper.isConfigured(h)) {
c@75 205 throw runtime_error("plugin has not been configured");
c@75 206 }
c@75 207
c@75 208 int channels = int(preq.inputBuffers.size());
c@75 209 if (channels != mapper.getChannelCount(h)) {
c@75 210 throw runtime_error("wrong number of channels supplied to process");
c@75 211 }
c@75 212
c@75 213 const float **fbuffers = new const float *[channels];
c@75 214 for (int i = 0; i < channels; ++i) {
c@75 215 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
c@75 216 delete[] fbuffers;
c@75 217 throw runtime_error("wrong block size supplied to process");
c@75 218 }
c@75 219 fbuffers[i] = preq.inputBuffers[i].data();
c@75 220 }
c@75 221
c@75 222 response.processResponse.plugin = preq.plugin;
c@75 223 response.processResponse.features =
c@75 224 preq.plugin->process(fbuffers, preq.timestamp);
c@75 225 response.success = true;
c@75 226
c@75 227 delete[] fbuffers;
c@75 228 break;
c@75 229 }
c@75 230
c@75 231 case RRType::Finish:
c@75 232 {
c@77 233 auto &freq = request.finishRequest;
c@77 234 response.finishResponse.plugin = freq.plugin;
c@77 235
c@77 236 auto h = mapper.pluginToHandle(freq.plugin);
c@77 237 // Finish can be called (to unload the plugin) even if the
c@77 238 // plugin has never been configured or used. But we want to
c@77 239 // make sure we call getRemainingFeatures only if we have
c@77 240 // actually configured the plugin.
c@77 241 if (mapper.isConfigured(h)) {
c@77 242 response.finishResponse.features = freq.plugin->getRemainingFeatures();
c@77 243 }
c@75 244
c@75 245 // We do not delete the plugin here -- we need it in the
c@77 246 // mapper when converting the features. It gets deleted in the
c@77 247 // calling function.
c@75 248 response.success = true;
c@75 249 break;
c@75 250 }
c@75 251
c@75 252 case RRType::NotValid:
c@75 253 break;
c@75 254 }
c@75 255
c@75 256 return response;
c@75 257 }
c@75 258
c@75 259 int main(int argc, char **argv)
c@75 260 {
c@75 261 if (argc != 1) {
c@75 262 usage();
c@75 263 }
c@75 264
c@75 265 while (true) {
c@75 266
c@75 267 RequestOrResponse request;
c@75 268
c@75 269 try {
c@75 270
c@75 271 request = readRequestCapnp();
c@75 272
c@75 273 cerr << "piper-vamp-server: request received, of type "
c@75 274 << int(request.type)
c@75 275 << endl;
c@75 276
c@75 277 // NotValid without an exception indicates EOF:
c@75 278 if (request.type == RRType::NotValid) {
c@75 279 cerr << "piper-vamp-server: eof reached" << endl;
c@75 280 break;
c@75 281 }
c@75 282
c@75 283 RequestOrResponse response = handleRequest(request);
c@75 284 response.id = request.id;
c@75 285
c@75 286 cerr << "piper-vamp-server: request handled, writing response"
c@75 287 << endl;
c@75 288
c@75 289 writeResponseCapnp(response);
c@75 290
c@75 291 cerr << "piper-vamp-server: response written" << endl;
c@75 292
c@75 293 if (request.type == RRType::Finish) {
c@75 294 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
c@75 295 mapper.removePlugin(h);
c@75 296 delete request.finishRequest.plugin;
c@75 297 }
c@75 298
c@75 299 } catch (std::exception &e) {
c@75 300
c@75 301 cerr << "piper-vamp-server: error: " << e.what() << endl;
c@75 302
c@75 303 writeExceptionCapnp(e, request.type);
c@75 304
c@75 305 exit(1);
c@75 306 }
c@75 307 }
c@75 308
c@75 309 exit(0);
c@75 310 }