annotate vamp-server/server.cpp @ 105:491a4bc10a01

Merge
author Chris Cannam <c.cannam@qmul.ac.uk>
date Mon, 17 Oct 2016 16:41:00 +0100
parents cd438188e3f9 6fad5154778e
children c996610b8a46
rev   line source
c@75 1
c@75 2 #include "vamp-capnp/VampnProto.h"
c@75 3 #include "vamp-support/RequestOrResponse.h"
c@75 4 #include "vamp-support/CountingPluginHandleMapper.h"
c@97 5 #include "vamp-support/LoaderRequests.h"
c@75 6
c@75 7 #include <iostream>
c@75 8 #include <sstream>
c@75 9 #include <stdexcept>
c@75 10
c@91 11 #include <capnp/serialize.h>
c@91 12
c@75 13 #include <map>
c@75 14 #include <set>
c@75 15
c@103 16 #include <unistd.h> // getpid for logging
c@103 17
c@75 18 using namespace std;
c@97 19 using namespace piper_vamp;
c@75 20 using namespace Vamp;
c@75 21
c@102 22 //!!! This could be faster and lighter:
c@102 23 // - Use Capnp structures directly rather than converting to vamp-support ones
c@102 24 // - Use Vamp C API (vamp.h) directly rather than converting to C++
c@102 25 //!!! Doing the above for process() and finish() alone would be a good start
c@102 26
c@103 27 static int pid = getpid();
c@103 28
c@75 29 void usage()
c@75 30 {
c@75 31 string myname = "piper-vamp-server";
c@75 32 cerr << "\n" << myname <<
c@75 33 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@75 34 " Usage: " << myname << "\n\n"
c@75 35 "Expects Piper request messages in Cap'n Proto packed format on stdin,\n"
c@75 36 "and writes Piper response messages in the same format to stdout.\n\n";
c@75 37
c@75 38 exit(2);
c@75 39 }
c@75 40
c@75 41 static CountingPluginHandleMapper mapper;
c@75 42
c@97 43 static RequestOrResponse::RpcId readId(const piper::RpcRequest::Reader &r)
c@75 44 {
c@75 45 int number;
c@75 46 string tag;
c@75 47 switch (r.getId().which()) {
c@97 48 case piper::RpcRequest::Id::Which::NUMBER:
c@75 49 number = r.getId().getNumber();
c@75 50 return { RequestOrResponse::RpcId::Number, number, "" };
c@97 51 case piper::RpcRequest::Id::Which::TAG:
c@75 52 tag = r.getId().getTag();
c@75 53 return { RequestOrResponse::RpcId::Tag, 0, tag };
c@97 54 case piper::RpcRequest::Id::Which::NONE:
c@75 55 return { RequestOrResponse::RpcId::Absent, 0, "" };
c@75 56 }
c@75 57 return {};
c@75 58 }
c@75 59
c@97 60 static void buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
c@75 61 {
c@75 62 switch (id.type) {
c@75 63 case RequestOrResponse::RpcId::Number:
c@75 64 b.getId().setNumber(id.number);
c@75 65 break;
c@75 66 case RequestOrResponse::RpcId::Tag:
c@75 67 b.getId().setTag(id.tag);
c@75 68 break;
c@75 69 case RequestOrResponse::RpcId::Absent:
c@75 70 b.getId().setNone();
c@75 71 break;
c@75 72 }
c@75 73 }
c@75 74
c@75 75 RequestOrResponse
c@75 76 readRequestCapnp()
c@75 77 {
c@75 78 RequestOrResponse rr;
c@75 79 rr.direction = RequestOrResponse::Request;
c@75 80
c@75 81 static kj::FdInputStream stream(0); // stdin
c@75 82 static kj::BufferedInputStreamWrapper buffered(stream);
c@75 83
c@75 84 if (buffered.tryGetReadBuffer() == nullptr) {
c@75 85 rr.type = RRType::NotValid;
c@75 86 return rr;
c@75 87 }
c@75 88
c@97 89 capnp::InputStreamMessageReader message(buffered);
c@97 90 piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>();
c@75 91
c@75 92 rr.type = VampnProto::getRequestResponseType(reader);
c@75 93 rr.id = readId(reader);
c@75 94
c@75 95 switch (rr.type) {
c@75 96
c@75 97 case RRType::List:
c@75 98 VampnProto::readRpcRequest_List(reader); // type check only
c@75 99 break;
c@75 100 case RRType::Load:
c@75 101 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
c@75 102 break;
c@75 103 case RRType::Configure:
c@75 104 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
c@75 105 reader, mapper);
c@75 106 break;
c@75 107 case RRType::Process:
c@75 108 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
c@75 109 break;
c@75 110 case RRType::Finish:
c@75 111 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
c@75 112 break;
c@75 113 case RRType::NotValid:
c@75 114 break;
c@75 115 }
c@75 116
c@75 117 return rr;
c@75 118 }
c@75 119
c@75 120 void
c@75 121 writeResponseCapnp(RequestOrResponse &rr)
c@75 122 {
c@97 123 capnp::MallocMessageBuilder message;
c@97 124 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
c@75 125
c@75 126 buildId(builder, rr.id);
c@75 127
c@75 128 if (!rr.success) {
c@75 129
c@75 130 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
c@75 131
c@75 132 } else {
c@75 133
c@75 134 switch (rr.type) {
c@75 135
c@75 136 case RRType::List:
c@75 137 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
c@75 138 break;
c@75 139 case RRType::Load:
c@75 140 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
c@75 141 break;
c@75 142 case RRType::Configure:
c@75 143 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
c@75 144 break;
c@75 145 case RRType::Process:
c@75 146 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
c@75 147 break;
c@75 148 case RRType::Finish:
c@75 149 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
c@75 150 break;
c@75 151 case RRType::NotValid:
c@75 152 break;
c@75 153 }
c@75 154 }
c@75 155
c@75 156 writeMessageToFd(1, message);
c@75 157 }
c@75 158
c@75 159 void
c@75 160 writeExceptionCapnp(const std::exception &e, RRType type)
c@75 161 {
c@97 162 capnp::MallocMessageBuilder message;
c@97 163 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
c@75 164 VampnProto::buildRpcResponse_Exception(builder, e, type);
c@75 165
c@75 166 writeMessageToFd(1, message);
c@75 167 }
c@75 168
c@75 169 RequestOrResponse
c@75 170 handleRequest(const RequestOrResponse &request)
c@75 171 {
c@75 172 RequestOrResponse response;
c@75 173 response.direction = RequestOrResponse::Response;
c@75 174 response.type = request.type;
c@75 175
c@75 176 switch (request.type) {
c@75 177
c@75 178 case RRType::List:
c@97 179 response.listResponse = LoaderRequests().listPluginData();
c@75 180 response.success = true;
c@75 181 break;
c@75 182
c@75 183 case RRType::Load:
c@97 184 response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest);
c@75 185 if (response.loadResponse.plugin != nullptr) {
c@75 186 mapper.addPlugin(response.loadResponse.plugin);
c@103 187 cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl;
c@75 188 response.success = true;
c@75 189 }
c@75 190 break;
c@75 191
c@75 192 case RRType::Configure:
c@75 193 {
c@75 194 auto &creq = request.configurationRequest;
c@75 195 auto h = mapper.pluginToHandle(creq.plugin);
c@75 196 if (mapper.isConfigured(h)) {
c@75 197 throw runtime_error("plugin has already been configured");
c@75 198 }
c@75 199
c@97 200 response.configurationResponse = LoaderRequests().configurePlugin(creq);
c@75 201
c@75 202 if (!response.configurationResponse.outputs.empty()) {
c@75 203 mapper.markConfigured
c@75 204 (h, creq.configuration.channelCount, creq.configuration.blockSize);
c@75 205 response.success = true;
c@75 206 }
c@75 207 break;
c@75 208 }
c@75 209
c@75 210 case RRType::Process:
c@75 211 {
c@75 212 auto &preq = request.processRequest;
c@75 213 auto h = mapper.pluginToHandle(preq.plugin);
c@75 214 if (!mapper.isConfigured(h)) {
c@75 215 throw runtime_error("plugin has not been configured");
c@75 216 }
c@75 217
c@75 218 int channels = int(preq.inputBuffers.size());
c@75 219 if (channels != mapper.getChannelCount(h)) {
c@75 220 throw runtime_error("wrong number of channels supplied to process");
c@75 221 }
c@75 222
c@75 223 const float **fbuffers = new const float *[channels];
c@75 224 for (int i = 0; i < channels; ++i) {
c@75 225 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
c@75 226 delete[] fbuffers;
c@75 227 throw runtime_error("wrong block size supplied to process");
c@75 228 }
c@75 229 fbuffers[i] = preq.inputBuffers[i].data();
c@75 230 }
c@75 231
c@75 232 response.processResponse.plugin = preq.plugin;
c@75 233 response.processResponse.features =
c@75 234 preq.plugin->process(fbuffers, preq.timestamp);
c@75 235 response.success = true;
c@75 236
c@75 237 delete[] fbuffers;
c@75 238 break;
c@75 239 }
c@75 240
c@75 241 case RRType::Finish:
c@75 242 {
c@77 243 auto &freq = request.finishRequest;
c@77 244 response.finishResponse.plugin = freq.plugin;
c@77 245
c@77 246 auto h = mapper.pluginToHandle(freq.plugin);
c@77 247 // Finish can be called (to unload the plugin) even if the
c@77 248 // plugin has never been configured or used. But we want to
c@77 249 // make sure we call getRemainingFeatures only if we have
c@77 250 // actually configured the plugin.
c@77 251 if (mapper.isConfigured(h)) {
c@77 252 response.finishResponse.features = freq.plugin->getRemainingFeatures();
c@77 253 }
c@75 254
c@75 255 // We do not delete the plugin here -- we need it in the
c@77 256 // mapper when converting the features. It gets deleted in the
c@77 257 // calling function.
c@75 258 response.success = true;
c@75 259 break;
c@75 260 }
c@75 261
c@75 262 case RRType::NotValid:
c@75 263 break;
c@75 264 }
c@75 265
c@75 266 return response;
c@75 267 }
c@75 268
c@103 269 int main(int argc, char **)
c@75 270 {
c@75 271 if (argc != 1) {
c@75 272 usage();
c@75 273 }
c@75 274
c@75 275 while (true) {
c@75 276
c@75 277 RequestOrResponse request;
c@75 278
c@75 279 try {
c@75 280
c@75 281 request = readRequestCapnp();
c@75 282
c@103 283 cerr << "piper-vamp-server " << pid << ": request received, of type "
c@75 284 << int(request.type)
c@75 285 << endl;
c@75 286
c@75 287 // NotValid without an exception indicates EOF:
c@75 288 if (request.type == RRType::NotValid) {
c@103 289 cerr << "piper-vamp-server " << pid << ": eof reached, exiting" << endl;
c@75 290 break;
c@75 291 }
c@75 292
c@75 293 RequestOrResponse response = handleRequest(request);
c@75 294 response.id = request.id;
c@75 295
c@103 296 cerr << "piper-vamp-server " << pid << ": request handled, writing response"
c@75 297 << endl;
c@75 298
c@75 299 writeResponseCapnp(response);
c@75 300
c@103 301 cerr << "piper-vamp-server " << pid << ": response written" << endl;
c@75 302
c@75 303 if (request.type == RRType::Finish) {
c@75 304 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
c@103 305 cerr << "piper-vamp-server " << pid << ": deleting the plugin with handle " << h << endl;
c@75 306 mapper.removePlugin(h);
c@75 307 delete request.finishRequest.plugin;
c@75 308 }
c@75 309
c@75 310 } catch (std::exception &e) {
c@75 311
c@103 312 cerr << "piper-vamp-server " << pid << ": error: " << e.what() << endl;
c@75 313
c@75 314 writeExceptionCapnp(e, request.type);
c@75 315
c@75 316 exit(1);
c@75 317 }
c@75 318 }
c@75 319
c@75 320 exit(0);
c@75 321 }