comparison vamp-server/server.cpp @ 116:d15cb1151d76

Add JSON support directly to the server. Had hoped to avoid this (using Capnp as canonical in the server and then converting externally as necessary) but it's just too useful for debugging purposes when bundled with client app
author Chris Cannam <c.cannam@qmul.ac.uk>
date Thu, 27 Oct 2016 11:39:41 +0100
parents b418b583fd3b
children ff3fd8d1b2dc
comparison
equal deleted inserted replaced
115:5a716f08e4be 116:d15cb1151d76
1 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
2
3 #include "vamp-json/VampJson.h"
2 #include "vamp-capnp/VampnProto.h" 4 #include "vamp-capnp/VampnProto.h"
3 #include "vamp-support/RequestOrResponse.h" 5 #include "vamp-support/RequestOrResponse.h"
4 #include "vamp-support/CountingPluginHandleMapper.h" 6 #include "vamp-support/CountingPluginHandleMapper.h"
5 #include "vamp-support/LoaderRequests.h" 7 #include "vamp-support/LoaderRequests.h"
6 8
21 #include <unistd.h> 23 #include <unistd.h>
22 static int pid = getpid(); 24 static int pid = getpid();
23 #endif 25 #endif
24 26
25 using namespace std; 27 using namespace std;
28 using namespace json11;
26 using namespace piper_vamp; 29 using namespace piper_vamp;
27 using namespace Vamp; 30 using namespace Vamp;
28 31
29 //!!! This could be faster and lighter: 32 //!!! This could be faster and lighter:
30 // - Use Capnp structures directly rather than converting to vamp-support ones 33 // - Use Capnp structures directly rather than converting to vamp-support ones
31 // - Use Vamp C API (vamp.h) directly rather than converting to C++ 34 // - Use Vamp C API (vamp.h) directly rather than converting to C++
32 //!!! Doing the above for process() and finish() alone would be a good start 35 //!!! Doing the above for process() and finish() alone would be a good start
33 36
34 void usage() 37 static string myname = "piper-vamp-server";
35 { 38
36 string myname = "piper-vamp-server"; 39 static void version()
40 {
41 cout << "1.0" << endl;
42 exit(0);
43 }
44
45 static void usage(bool successful = false)
46 {
37 cerr << "\n" << myname << 47 cerr << "\n" << myname <<
38 ": Load and run Vamp plugins in response to messages from stdin\n\n" 48 ": Load and run Vamp plugins in response to Piper messages\n\n"
39 " Usage: " << myname << "\n\n" 49 " Usage: " << myname << " [-d] <format>\n"
40 "Expects Piper request messages in Cap'n Proto packed format on stdin,\n" 50 " " << myname << " -v\n"
41 "and writes Piper response messages in the same format to stdout.\n\n"; 51 " " << myname << " -h\n\n"
42 52 " where\n"
43 exit(2); 53 " <format>: the format to read and write messages in (\"json\" or \"capnp\")\n"
54 " -d: also print debug information to stderr\n"
55 " -v: print version number to stdout and exit\n"
56 " -h: print this text to stderr and exit\n\n"
57 "Expects Piper request messages in either Cap'n Proto or JSON format on stdin,\n"
58 "and writes response messages in the same format to stdout.\n\n";
59 if (successful) exit(0);
60 else exit(2);
44 } 61 }
45 62
46 static CountingPluginHandleMapper mapper; 63 static CountingPluginHandleMapper mapper;
47 64
48 static RequestOrResponse::RpcId readId(const piper::RpcRequest::Reader &r) 65 static RequestOrResponse::RpcId
66 readId(const piper::RpcRequest::Reader &r)
49 { 67 {
50 int number; 68 int number;
51 string tag; 69 string tag;
52 switch (r.getId().which()) { 70 switch (r.getId().which()) {
53 case piper::RpcRequest::Id::Which::NUMBER: 71 case piper::RpcRequest::Id::Which::NUMBER:
60 return { RequestOrResponse::RpcId::Absent, 0, "" }; 78 return { RequestOrResponse::RpcId::Absent, 0, "" };
61 } 79 }
62 return {}; 80 return {};
63 } 81 }
64 82
65 static void buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id) 83 static void
84 buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
66 { 85 {
67 switch (id.type) { 86 switch (id.type) {
68 case RequestOrResponse::RpcId::Number: 87 case RequestOrResponse::RpcId::Number:
69 b.getId().setNumber(id.number); 88 b.getId().setNumber(id.number);
70 break; 89 break;
75 b.getId().setNone(); 94 b.getId().setNone();
76 break; 95 break;
77 } 96 }
78 } 97 }
79 98
99 static RequestOrResponse::RpcId
100 readJsonId(const Json &j)
101 {
102 RequestOrResponse::RpcId id;
103
104 if (j["id"].is_number()) {
105 id.type = RequestOrResponse::RpcId::Number;
106 id.number = j["id"].number_value();
107 } else if (j["id"].is_string()) {
108 id.type = RequestOrResponse::RpcId::Tag;
109 id.tag = j["id"].string_value();
110 } else {
111 id.type = RequestOrResponse::RpcId::Absent;
112 }
113
114 return id;
115 }
116
117 static Json
118 writeJsonId(const RequestOrResponse::RpcId &id)
119 {
120 if (id.type == RequestOrResponse::RpcId::Number) {
121 return id.number;
122 } else if (id.type == RequestOrResponse::RpcId::Tag) {
123 return id.tag;
124 } else {
125 return Json();
126 }
127 }
128
129 static Json
130 convertRequestJson(string input, string &err)
131 {
132 Json j = Json::parse(input, err);
133 if (err != "") {
134 err = "invalid json: " + err;
135 return {};
136 }
137 if (!j.is_object()) {
138 err = "object expected at top level";
139 } else if (!j["method"].is_string()) {
140 err = "string expected for method field";
141 } else if (!j["params"].is_null() && !j["params"].is_object()) {
142 err = "object expected for params field";
143 }
144 return j;
145 }
146
147 RequestOrResponse
148 readRequestJson(string &err)
149 {
150 RequestOrResponse rr;
151 rr.direction = RequestOrResponse::Request;
152
153 string input;
154 if (!getline(cin, input)) {
155 // the EOF case, not actually an error
156 rr.type = RRType::NotValid;
157 return rr;
158 }
159
160 Json j = convertRequestJson(input, err);
161 if (err != "") return {};
162
163 rr.type = VampJson::getRequestResponseType(j, err);
164 if (err != "") return {};
165
166 rr.id = readJsonId(j);
167
168 VampJson::BufferSerialisation serialisation =
169 VampJson::BufferSerialisation::Array;
170
171 switch (rr.type) {
172
173 case RRType::List:
174 VampJson::toRpcRequest_List(j, err); // type check only
175 break;
176 case RRType::Load:
177 rr.loadRequest = VampJson::toRpcRequest_Load(j, err);
178 break;
179 case RRType::Configure:
180 rr.configurationRequest = VampJson::toRpcRequest_Configure(j, mapper, err);
181 break;
182 case RRType::Process:
183 rr.processRequest = VampJson::toRpcRequest_Process(j, mapper, serialisation, err);
184 break;
185 case RRType::Finish:
186 rr.finishRequest = VampJson::toRpcRequest_Finish(j, mapper, err);
187 break;
188 case RRType::NotValid:
189 break;
190 }
191
192 return rr;
193 }
194
195 void
196 writeResponseJson(RequestOrResponse &rr, bool useBase64)
197 {
198 Json j;
199
200 VampJson::BufferSerialisation serialisation =
201 (useBase64 ?
202 VampJson::BufferSerialisation::Base64 :
203 VampJson::BufferSerialisation::Array);
204
205 Json id = writeJsonId(rr.id);
206
207 if (!rr.success) {
208
209 j = VampJson::fromError(rr.errorText, rr.type, id);
210
211 } else {
212
213 switch (rr.type) {
214
215 case RRType::List:
216 j = VampJson::fromRpcResponse_List(rr.listResponse, id);
217 break;
218 case RRType::Load:
219 j = VampJson::fromRpcResponse_Load(rr.loadResponse, mapper, id);
220 break;
221 case RRType::Configure:
222 j = VampJson::fromRpcResponse_Configure(rr.configurationResponse,
223 mapper, id);
224 break;
225 case RRType::Process:
226 j = VampJson::fromRpcResponse_Process
227 (rr.processResponse, mapper, serialisation, id);
228 break;
229 case RRType::Finish:
230 j = VampJson::fromRpcResponse_Finish
231 (rr.finishResponse, mapper, serialisation, id);
232 break;
233 case RRType::NotValid:
234 break;
235 }
236 }
237
238 cout << j.dump() << endl;
239 }
240
241 void
242 writeExceptionJson(const std::exception &e, RRType type)
243 {
244 Json j = VampJson::fromError(e.what(), type, Json());
245 cout << j.dump() << endl;
246 }
247
80 RequestOrResponse 248 RequestOrResponse
81 readRequestCapnp() 249 readRequestCapnp()
82 { 250 {
83 RequestOrResponse rr; 251 RequestOrResponse rr;
84 rr.direction = RequestOrResponse::Request; 252 rr.direction = RequestOrResponse::Request;
85 253
86 static kj::FdInputStream stream(0); // stdin 254 static kj::FdInputStream stream(0); // stdin
87 static kj::BufferedInputStreamWrapper buffered(stream); 255 static kj::BufferedInputStreamWrapper buffered(stream);
88 256
89 if (buffered.tryGetReadBuffer() == nullptr) { 257 if (buffered.tryGetReadBuffer() == nullptr) {
90 rr.type = RRType::NotValid; 258 rr.type = RRType::NotValid;
91 return rr; 259 return rr;
92 } 260 }
93 261
94 capnp::InputStreamMessageReader message(buffered); 262 capnp::InputStreamMessageReader message(buffered);
95 piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>(); 263 piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>();
96 264
98 rr.id = readId(reader); 266 rr.id = readId(reader);
99 267
100 switch (rr.type) { 268 switch (rr.type) {
101 269
102 case RRType::List: 270 case RRType::List:
103 VampnProto::readRpcRequest_List(reader); // type check only 271 VampnProto::readRpcRequest_List(reader); // type check only
104 break; 272 break;
105 case RRType::Load: 273 case RRType::Load:
106 VampnProto::readRpcRequest_Load(rr.loadRequest, reader); 274 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
107 break; 275 break;
108 case RRType::Configure: 276 case RRType::Configure:
109 VampnProto::readRpcRequest_Configure(rr.configurationRequest, 277 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
110 reader, mapper); 278 reader, mapper);
111 break; 279 break;
112 case RRType::Process: 280 case RRType::Process:
113 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper); 281 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
114 break; 282 break;
115 case RRType::Finish: 283 case RRType::Finish:
116 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper); 284 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
117 break; 285 break;
118 case RRType::NotValid: 286 case RRType::NotValid:
119 break; 287 break;
120 } 288 }
121 289
122 return rr; 290 return rr;
123 } 291 }
124 292
130 298
131 buildId(builder, rr.id); 299 buildId(builder, rr.id);
132 300
133 if (!rr.success) { 301 if (!rr.success) {
134 302
135 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type); 303 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
136 304
137 } else { 305 } else {
138 306
139 switch (rr.type) { 307 switch (rr.type) {
140 308
141 case RRType::List: 309 case RRType::List:
142 VampnProto::buildRpcResponse_List(builder, rr.listResponse); 310 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
143 break; 311 break;
144 case RRType::Load: 312 case RRType::Load:
145 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper); 313 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
146 break; 314 break;
147 case RRType::Configure: 315 case RRType::Configure:
148 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper); 316 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
149 break; 317 break;
150 case RRType::Process: 318 case RRType::Process:
151 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper); 319 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
152 break; 320 break;
153 case RRType::Finish: 321 case RRType::Finish:
154 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper); 322 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
155 break; 323 break;
156 case RRType::NotValid: 324 case RRType::NotValid:
157 break; 325 break;
158 } 326 }
159 } 327 }
160 328
161 writeMessageToFd(1, message); 329 writeMessageToFd(1, message);
162 } 330 }
163 331
170 338
171 writeMessageToFd(1, message); 339 writeMessageToFd(1, message);
172 } 340 }
173 341
174 RequestOrResponse 342 RequestOrResponse
175 handleRequest(const RequestOrResponse &request) 343 handleRequest(const RequestOrResponse &request, bool debug)
176 { 344 {
177 RequestOrResponse response; 345 RequestOrResponse response;
178 response.direction = RequestOrResponse::Response; 346 response.direction = RequestOrResponse::Response;
179 response.type = request.type; 347 response.type = request.type;
180 348
181 switch (request.type) { 349 switch (request.type) {
182 350
183 case RRType::List: 351 case RRType::List:
184 response.listResponse = LoaderRequests().listPluginData(); 352 response.listResponse = LoaderRequests().listPluginData();
185 response.success = true; 353 response.success = true;
186 break; 354 break;
187 355
188 case RRType::Load: 356 case RRType::Load:
189 response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest); 357 response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest);
190 if (response.loadResponse.plugin != nullptr) { 358 if (response.loadResponse.plugin != nullptr) {
191 mapper.addPlugin(response.loadResponse.plugin); 359 mapper.addPlugin(response.loadResponse.plugin);
192 cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl; 360 if (debug) {
193 response.success = true; 361 cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl;
194 } 362 }
195 break; 363 response.success = true;
196 364 }
365 break;
366
197 case RRType::Configure: 367 case RRType::Configure:
198 { 368 {
199 auto &creq = request.configurationRequest; 369 auto &creq = request.configurationRequest;
200 auto h = mapper.pluginToHandle(creq.plugin); 370 auto h = mapper.pluginToHandle(creq.plugin);
201 if (mapper.isConfigured(h)) { 371 if (mapper.isConfigured(h)) {
202 throw runtime_error("plugin has already been configured"); 372 throw runtime_error("plugin has already been configured");
203 } 373 }
204 374
205 response.configurationResponse = LoaderRequests().configurePlugin(creq); 375 response.configurationResponse = LoaderRequests().configurePlugin(creq);
206 376
207 if (!response.configurationResponse.outputs.empty()) { 377 if (!response.configurationResponse.outputs.empty()) {
208 mapper.markConfigured 378 mapper.markConfigured
209 (h, creq.configuration.channelCount, creq.configuration.blockSize); 379 (h, creq.configuration.channelCount, creq.configuration.blockSize);
210 response.success = true; 380 response.success = true;
211 } 381 }
212 break; 382 break;
213 } 383 }
214 384
215 case RRType::Process: 385 case RRType::Process:
216 { 386 {
217 auto &preq = request.processRequest; 387 auto &preq = request.processRequest;
218 auto h = mapper.pluginToHandle(preq.plugin); 388 auto h = mapper.pluginToHandle(preq.plugin);
219 if (!mapper.isConfigured(h)) { 389 if (!mapper.isConfigured(h)) {
220 throw runtime_error("plugin has not been configured"); 390 throw runtime_error("plugin has not been configured");
221 } 391 }
222 392
223 int channels = int(preq.inputBuffers.size()); 393 int channels = int(preq.inputBuffers.size());
224 if (channels != mapper.getChannelCount(h)) { 394 if (channels != mapper.getChannelCount(h)) {
225 throw runtime_error("wrong number of channels supplied to process"); 395 throw runtime_error("wrong number of channels supplied to process");
226 } 396 }
227 397
228 const float **fbuffers = new const float *[channels]; 398 const float **fbuffers = new const float *[channels];
229 for (int i = 0; i < channels; ++i) { 399 for (int i = 0; i < channels; ++i) {
230 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) { 400 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
231 delete[] fbuffers; 401 delete[] fbuffers;
232 throw runtime_error("wrong block size supplied to process"); 402 throw runtime_error("wrong block size supplied to process");
233 } 403 }
234 fbuffers[i] = preq.inputBuffers[i].data(); 404 fbuffers[i] = preq.inputBuffers[i].data();
235 } 405 }
236 406
237 response.processResponse.plugin = preq.plugin; 407 response.processResponse.plugin = preq.plugin;
238 response.processResponse.features = 408 response.processResponse.features =
239 preq.plugin->process(fbuffers, preq.timestamp); 409 preq.plugin->process(fbuffers, preq.timestamp);
240 response.success = true; 410 response.success = true;
241 411
242 delete[] fbuffers; 412 delete[] fbuffers;
243 break; 413 break;
244 } 414 }
245 415
246 case RRType::Finish: 416 case RRType::Finish:
247 { 417 {
248 auto &freq = request.finishRequest; 418 auto &freq = request.finishRequest;
249 response.finishResponse.plugin = freq.plugin; 419 response.finishResponse.plugin = freq.plugin;
250 420
251 auto h = mapper.pluginToHandle(freq.plugin); 421 auto h = mapper.pluginToHandle(freq.plugin);
252 // Finish can be called (to unload the plugin) even if the 422 // Finish can be called (to unload the plugin) even if the
253 // plugin has never been configured or used. But we want to 423 // plugin has never been configured or used. But we want to
254 // make sure we call getRemainingFeatures only if we have 424 // make sure we call getRemainingFeatures only if we have
255 // actually configured the plugin. 425 // actually configured the plugin.
256 if (mapper.isConfigured(h)) { 426 if (mapper.isConfigured(h)) {
257 response.finishResponse.features = freq.plugin->getRemainingFeatures(); 427 response.finishResponse.features = freq.plugin->getRemainingFeatures();
258 } 428 }
259 429
260 // We do not delete the plugin here -- we need it in the 430 // We do not delete the plugin here -- we need it in the
261 // mapper when converting the features. It gets deleted in the 431 // mapper when converting the features. It gets deleted in the
262 // calling function. 432 // calling function.
263 response.success = true; 433 response.success = true;
264 break; 434 break;
265 } 435 }
266 436
267 case RRType::NotValid: 437 case RRType::NotValid:
268 break; 438 break;
269 } 439 }
270 440
271 return response; 441 return response;
272 } 442 }
273 443
274 int main(int argc, char **) 444 RequestOrResponse
275 { 445 readRequest(string format)
276 if (argc != 1) { 446 {
277 usage(); 447 if (format == "capnp") {
278 } 448 return readRequestCapnp();
279 449 } else if (format == "json") {
280 cerr << "piper-vamp-server: ready" << endl; 450 string err;
281 451 auto result = readRequestJson(err);
452 if (err != "") throw runtime_error(err);
453 else return result;
454 } else {
455 throw runtime_error("unknown input format \"" + format + "\"");
456 }
457 }
458
459 void
460 writeResponse(string format, RequestOrResponse &rr)
461 {
462 if (format == "capnp") {
463 writeResponseCapnp(rr);
464 } else if (format == "json") {
465 writeResponseJson(rr, false);
466 } else {
467 throw runtime_error("unknown output format \"" + format + "\"");
468 }
469 }
470
471 void
472 writeException(string format, const std::exception &e, RRType type)
473 {
474 if (format == "capnp") {
475 writeExceptionCapnp(e, type);
476 } else if (format == "json") {
477 writeExceptionJson(e, type);
478 } else {
479 throw runtime_error("unknown output format \"" + format + "\"");
480 }
481 }
482
483 int main(int argc, char **argv)
484 {
485 if (argc != 2 && argc != 3) {
486 usage();
487 }
488
489 bool debug = false;
490
491 string arg = argv[1];
492 if (arg == "-h") {
493 if (argc == 2) {
494 usage(true);
495 } else {
496 usage();
497 }
498 } else if (arg == "-v") {
499 if (argc == 2) {
500 version();
501 } else {
502 usage();
503 }
504 } else if (arg == "-d") {
505 if (argc == 2) {
506 usage();
507 } else {
508 debug = true;
509 arg = argv[2];
510 }
511 }
512
513 string format = arg;
514
515 if (format != "capnp" && format != "json") {
516 usage();
517 }
518
519 if (debug) {
520 cerr << myname << " " << pid << ": waiting for format: " << format << endl;
521 }
522
282 while (true) { 523 while (true) {
283 524
284 RequestOrResponse request; 525 RequestOrResponse request;
285 526
286 try { 527 try {
287 528
288 request = readRequestCapnp(); 529 request = readRequest(format);
289 530
290 cerr << "piper-vamp-server " << pid << ": request received, of type " 531 // NotValid without an exception indicates EOF:
291 << int(request.type) 532 if (request.type == RRType::NotValid) {
292 << endl; 533 if (debug) {
293 534 cerr << myname << " " << pid << ": eof reached, exiting" << endl;
294 // NotValid without an exception indicates EOF: 535 }
295 if (request.type == RRType::NotValid) { 536 break;
296 cerr << "piper-vamp-server " << pid << ": eof reached, exiting" << endl; 537 }
297 break; 538
298 } 539 if (debug) {
299 540 cerr << myname << " " << pid << ": request received, of type "
300 RequestOrResponse response = handleRequest(request); 541 << int(request.type)
542 << endl;
543 }
544
545 RequestOrResponse response = handleRequest(request, debug);
301 response.id = request.id; 546 response.id = request.id;
302 547
303 cerr << "piper-vamp-server " << pid << ": request handled, writing response" 548 if (debug) {
304 << endl; 549 cerr << myname << " " << pid << ": request handled, writing response"
305 550 << endl;
306 writeResponseCapnp(response); 551 }
307 552
308 cerr << "piper-vamp-server " << pid << ": response written" << endl; 553 writeResponse(format, response);
309 554
310 if (request.type == RRType::Finish) { 555 if (debug) {
311 auto h = mapper.pluginToHandle(request.finishRequest.plugin); 556 cerr << myname << " " << pid << ": response written" << endl;
312 cerr << "piper-vamp-server " << pid << ": deleting the plugin with handle " << h << endl; 557 }
313 mapper.removePlugin(h); 558
314 delete request.finishRequest.plugin; 559 if (request.type == RRType::Finish) {
315 } 560 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
316 561 if (debug) {
317 } catch (std::exception &e) { 562 cerr << myname << " " << pid << ": deleting the plugin with handle " << h << endl;
318 563 }
319 cerr << "piper-vamp-server " << pid << ": error: " << e.what() << endl; 564 mapper.removePlugin(h);
320 565 delete request.finishRequest.plugin;
321 writeExceptionCapnp(e, request.type); 566 }
322 567
323 exit(1); 568 } catch (std::exception &e) {
324 } 569
570 if (debug) {
571 cerr << myname << " " << pid << ": error: " << e.what() << endl;
572 }
573
574 writeException(format, e, request.type);
575
576 exit(1);
577 }
325 } 578 }
326 579
327 exit(0); 580 exit(0);
328 } 581 }