comparison vamp-server/simple-server.cpp @ 125:ea06fae1567c

Rename server to simple-server, and add some more description in usage
author Chris Cannam <c.cannam@qmul.ac.uk>
date Fri, 28 Oct 2016 11:08:17 +0100
parents
children 5b113c87b6e6
comparison
equal deleted inserted replaced
124:4ff643c1eccc 125:ea06fae1567c
1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
2 /*
3 Piper C++
4
5 An API for audio analysis and feature extraction plugins.
6
7 Centre for Digital Music, Queen Mary, University of London.
8 Copyright 2006-2016 Chris Cannam and QMUL.
9
10 Permission is hereby granted, free of charge, to any person
11 obtaining a copy of this software and associated documentation
12 files (the "Software"), to deal in the Software without
13 restriction, including without limitation the rights to use, copy,
14 modify, merge, publish, distribute, sublicense, and/or sell copies
15 of the Software, and to permit persons to whom the Software is
16 furnished to do so, subject to the following conditions:
17
18 The above copyright notice and this permission notice shall be
19 included in all copies or substantial portions of the Software.
20
21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28
29 Except as contained in this notice, the names of the Centre for
30 Digital Music; Queen Mary, University of London; and Chris Cannam
31 shall not be used in advertising or otherwise to promote the sale,
32 use or other dealings in this Software without prior written
33 authorization.
34 */
35
36 #include "vamp-json/VampJson.h"
37 #include "vamp-capnp/VampnProto.h"
38 #include "vamp-support/RequestOrResponse.h"
39 #include "vamp-support/CountingPluginHandleMapper.h"
40 #include "vamp-support/LoaderRequests.h"
41
42 #include <iostream>
43 #include <sstream>
44 #include <stdexcept>
45
46 #include <capnp/serialize.h>
47
48 #include <map>
49 #include <set>
50
51 // pid for logging
52 #ifdef _WIN32
53 #include <process.h>
54 static int pid = _getpid();
55 #else
56 #include <unistd.h>
57 static int pid = getpid();
58 #endif
59
60 // for _setmode stuff
61 #ifdef _WIN32
62 #include <io.h>
63 #include <fcntl.h>
64 #endif
65
66 using namespace std;
67 using namespace json11;
68 using namespace piper_vamp;
69 using namespace Vamp;
70
71 //!!! This could be faster and lighter:
72 // - Use Capnp structures directly rather than converting to vamp-support ones
73 // - Use Vamp C API (vamp.h) directly rather than converting to C++
74 //!!! Doing the above for process() and finish() alone would be a good start
75
76 static string myname = "piper-vamp-simple-server";
77
78 static void version()
79 {
80 cout << "1.0" << endl;
81 exit(0);
82 }
83
84 static void usage(bool successful = false)
85 {
86 cerr << "\n" << myname <<
87 ": Load & run Vamp plugins in response to Piper messages\n\n"
88 " Usage: " << myname << " [-d] <format>\n"
89 " " << myname << " -v\n"
90 " " << myname << " -h\n\n"
91 " where\n"
92 " <format>: the format to read and write messages in (\"json\" or \"capnp\")\n"
93 " -d: also print debug information to stderr\n"
94 " -v: print version number to stdout and exit\n"
95 " -h: print this text to stderr and exit\n\n"
96 "Expects Piper request messages in either Cap'n Proto or JSON format on stdin,\n"
97 "and writes response messages in the same format to stdout.\n\n"
98 "This server is intended for simple process separation. It's only suitable for\n"
99 "use with a single trusted client per server invocation.\n\n"
100 "The two formats behave differently in case of parser errors. JSON messages are\n"
101 "expected one per input line; because the JSON support is really intended for\n"
102 "interactive troubleshooting, any unparseable message is reported and discarded\n"
103 "and the server waits for another message. In contrast, because of the assumption\n"
104 "that the client is trusted and coupled to the server instance, a mangled\n"
105 "Cap'n Proto message causes the server to exit.\n\n";
106 if (successful) exit(0);
107 else exit(2);
108 }
109
110 static CountingPluginHandleMapper mapper;
111
112 static RequestOrResponse::RpcId
113 readId(const piper::RpcRequest::Reader &r)
114 {
115 int number;
116 string tag;
117 switch (r.getId().which()) {
118 case piper::RpcRequest::Id::Which::NUMBER:
119 number = r.getId().getNumber();
120 return { RequestOrResponse::RpcId::Number, number, "" };
121 case piper::RpcRequest::Id::Which::TAG:
122 tag = r.getId().getTag();
123 return { RequestOrResponse::RpcId::Tag, 0, tag };
124 case piper::RpcRequest::Id::Which::NONE:
125 return { RequestOrResponse::RpcId::Absent, 0, "" };
126 }
127 return {};
128 }
129
130 static void
131 buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
132 {
133 switch (id.type) {
134 case RequestOrResponse::RpcId::Number:
135 b.getId().setNumber(id.number);
136 break;
137 case RequestOrResponse::RpcId::Tag:
138 b.getId().setTag(id.tag);
139 break;
140 case RequestOrResponse::RpcId::Absent:
141 b.getId().setNone();
142 break;
143 }
144 }
145
146 static RequestOrResponse::RpcId
147 readJsonId(const Json &j)
148 {
149 RequestOrResponse::RpcId id;
150
151 if (j["id"].is_number()) {
152 id.type = RequestOrResponse::RpcId::Number;
153 id.number = j["id"].number_value();
154 } else if (j["id"].is_string()) {
155 id.type = RequestOrResponse::RpcId::Tag;
156 id.tag = j["id"].string_value();
157 } else {
158 id.type = RequestOrResponse::RpcId::Absent;
159 }
160
161 return id;
162 }
163
164 static Json
165 writeJsonId(const RequestOrResponse::RpcId &id)
166 {
167 if (id.type == RequestOrResponse::RpcId::Number) {
168 return id.number;
169 } else if (id.type == RequestOrResponse::RpcId::Tag) {
170 return id.tag;
171 } else {
172 return Json();
173 }
174 }
175
176 static Json
177 convertRequestJson(string input, string &err)
178 {
179 Json j = Json::parse(input, err);
180 if (err != "") {
181 err = "invalid json: " + err;
182 return {};
183 }
184 if (!j.is_object()) {
185 err = "object expected at top level";
186 } else if (!j["method"].is_string()) {
187 err = "string expected for method field";
188 } else if (!j["params"].is_null() && !j["params"].is_object()) {
189 err = "object expected for params field";
190 }
191 return j;
192 }
193
194 RequestOrResponse
195 readRequestJson(string &err)
196 {
197 RequestOrResponse rr;
198 rr.direction = RequestOrResponse::Request;
199
200 string input;
201 if (!getline(cin, input)) {
202 // the EOF case, not actually an error
203 rr.type = RRType::NotValid;
204 return rr;
205 }
206
207 Json j = convertRequestJson(input, err);
208 if (err != "") return {};
209
210 rr.type = VampJson::getRequestResponseType(j, err);
211 if (err != "") return {};
212
213 rr.id = readJsonId(j);
214
215 VampJson::BufferSerialisation serialisation =
216 VampJson::BufferSerialisation::Array;
217
218 switch (rr.type) {
219
220 case RRType::List:
221 VampJson::toRpcRequest_List(j, err); // type check only
222 break;
223 case RRType::Load:
224 rr.loadRequest = VampJson::toRpcRequest_Load(j, err);
225 break;
226 case RRType::Configure:
227 rr.configurationRequest = VampJson::toRpcRequest_Configure(j, mapper, err);
228 break;
229 case RRType::Process:
230 rr.processRequest = VampJson::toRpcRequest_Process(j, mapper, serialisation, err);
231 break;
232 case RRType::Finish:
233 rr.finishRequest = VampJson::toRpcRequest_Finish(j, mapper, err);
234 break;
235 case RRType::NotValid:
236 break;
237 }
238
239 return rr;
240 }
241
242 void
243 writeResponseJson(RequestOrResponse &rr, bool useBase64)
244 {
245 Json j;
246
247 VampJson::BufferSerialisation serialisation =
248 (useBase64 ?
249 VampJson::BufferSerialisation::Base64 :
250 VampJson::BufferSerialisation::Array);
251
252 Json id = writeJsonId(rr.id);
253
254 if (!rr.success) {
255
256 j = VampJson::fromError(rr.errorText, rr.type, id);
257
258 } else {
259
260 switch (rr.type) {
261
262 case RRType::List:
263 j = VampJson::fromRpcResponse_List(rr.listResponse, id);
264 break;
265 case RRType::Load:
266 j = VampJson::fromRpcResponse_Load(rr.loadResponse, mapper, id);
267 break;
268 case RRType::Configure:
269 j = VampJson::fromRpcResponse_Configure(rr.configurationResponse,
270 mapper, id);
271 break;
272 case RRType::Process:
273 j = VampJson::fromRpcResponse_Process
274 (rr.processResponse, mapper, serialisation, id);
275 break;
276 case RRType::Finish:
277 j = VampJson::fromRpcResponse_Finish
278 (rr.finishResponse, mapper, serialisation, id);
279 break;
280 case RRType::NotValid:
281 break;
282 }
283 }
284
285 cout << j.dump() << endl;
286 }
287
288 void
289 writeExceptionJson(const std::exception &e, RRType type)
290 {
291 Json j = VampJson::fromError(e.what(), type, Json());
292 cout << j.dump() << endl;
293 }
294
295 RequestOrResponse
296 readRequestCapnp()
297 {
298 RequestOrResponse rr;
299 rr.direction = RequestOrResponse::Request;
300
301 static kj::FdInputStream stream(0); // stdin
302 static kj::BufferedInputStreamWrapper buffered(stream);
303
304 if (buffered.tryGetReadBuffer() == nullptr) {
305 rr.type = RRType::NotValid;
306 return rr;
307 }
308
309 capnp::InputStreamMessageReader message(buffered);
310 piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>();
311
312 rr.type = VampnProto::getRequestResponseType(reader);
313 rr.id = readId(reader);
314
315 switch (rr.type) {
316
317 case RRType::List:
318 VampnProto::readRpcRequest_List(reader); // type check only
319 break;
320 case RRType::Load:
321 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
322 break;
323 case RRType::Configure:
324 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
325 reader, mapper);
326 break;
327 case RRType::Process:
328 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
329 break;
330 case RRType::Finish:
331 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
332 break;
333 case RRType::NotValid:
334 break;
335 }
336
337 return rr;
338 }
339
340 void
341 writeResponseCapnp(RequestOrResponse &rr)
342 {
343 capnp::MallocMessageBuilder message;
344 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
345
346 buildId(builder, rr.id);
347
348 if (!rr.success) {
349
350 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
351
352 } else {
353
354 switch (rr.type) {
355
356 case RRType::List:
357 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
358 break;
359 case RRType::Load:
360 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
361 break;
362 case RRType::Configure:
363 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
364 break;
365 case RRType::Process:
366 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
367 break;
368 case RRType::Finish:
369 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
370 break;
371 case RRType::NotValid:
372 break;
373 }
374 }
375
376 writeMessageToFd(1, message);
377 }
378
379 void
380 writeExceptionCapnp(const std::exception &e, RRType type)
381 {
382 capnp::MallocMessageBuilder message;
383 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
384 VampnProto::buildRpcResponse_Exception(builder, e, type);
385
386 writeMessageToFd(1, message);
387 }
388
389 RequestOrResponse
390 handleRequest(const RequestOrResponse &request, bool debug)
391 {
392 RequestOrResponse response;
393 response.direction = RequestOrResponse::Response;
394 response.type = request.type;
395
396 switch (request.type) {
397
398 case RRType::List:
399 response.listResponse = LoaderRequests().listPluginData();
400 response.success = true;
401 break;
402
403 case RRType::Load:
404 response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest);
405 if (response.loadResponse.plugin != nullptr) {
406 mapper.addPlugin(response.loadResponse.plugin);
407 if (debug) {
408 cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl;
409 }
410 response.success = true;
411 }
412 break;
413
414 case RRType::Configure:
415 {
416 auto &creq = request.configurationRequest;
417 auto h = mapper.pluginToHandle(creq.plugin);
418 if (mapper.isConfigured(h)) {
419 throw runtime_error("plugin has already been configured");
420 }
421
422 response.configurationResponse = LoaderRequests().configurePlugin(creq);
423
424 if (!response.configurationResponse.outputs.empty()) {
425 mapper.markConfigured
426 (h, creq.configuration.channelCount, creq.configuration.blockSize);
427 response.success = true;
428 }
429 break;
430 }
431
432 case RRType::Process:
433 {
434 auto &preq = request.processRequest;
435 auto h = mapper.pluginToHandle(preq.plugin);
436 if (!mapper.isConfigured(h)) {
437 throw runtime_error("plugin has not been configured");
438 }
439
440 int channels = int(preq.inputBuffers.size());
441 if (channels != mapper.getChannelCount(h)) {
442 throw runtime_error("wrong number of channels supplied to process");
443 }
444
445 const float **fbuffers = new const float *[channels];
446 for (int i = 0; i < channels; ++i) {
447 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
448 delete[] fbuffers;
449 throw runtime_error("wrong block size supplied to process");
450 }
451 fbuffers[i] = preq.inputBuffers[i].data();
452 }
453
454 response.processResponse.plugin = preq.plugin;
455 response.processResponse.features =
456 preq.plugin->process(fbuffers, preq.timestamp);
457 response.success = true;
458
459 delete[] fbuffers;
460 break;
461 }
462
463 case RRType::Finish:
464 {
465 auto &freq = request.finishRequest;
466 response.finishResponse.plugin = freq.plugin;
467
468 auto h = mapper.pluginToHandle(freq.plugin);
469 // Finish can be called (to unload the plugin) even if the
470 // plugin has never been configured or used. But we want to
471 // make sure we call getRemainingFeatures only if we have
472 // actually configured the plugin.
473 if (mapper.isConfigured(h)) {
474 response.finishResponse.features = freq.plugin->getRemainingFeatures();
475 }
476
477 // We do not delete the plugin here -- we need it in the
478 // mapper when converting the features. It gets deleted in the
479 // calling function.
480 response.success = true;
481 break;
482 }
483
484 case RRType::NotValid:
485 break;
486 }
487
488 return response;
489 }
490
491 RequestOrResponse
492 readRequest(string format)
493 {
494 if (format == "capnp") {
495 return readRequestCapnp();
496 } else if (format == "json") {
497 string err;
498 auto result = readRequestJson(err);
499 if (err != "") throw runtime_error(err);
500 else return result;
501 } else {
502 throw runtime_error("unknown input format \"" + format + "\"");
503 }
504 }
505
506 void
507 writeResponse(string format, RequestOrResponse &rr)
508 {
509 if (format == "capnp") {
510 writeResponseCapnp(rr);
511 } else if (format == "json") {
512 writeResponseJson(rr, false);
513 } else {
514 throw runtime_error("unknown output format \"" + format + "\"");
515 }
516 }
517
518 void
519 writeException(string format, const std::exception &e, RRType type)
520 {
521 if (format == "capnp") {
522 writeExceptionCapnp(e, type);
523 } else if (format == "json") {
524 writeExceptionJson(e, type);
525 } else {
526 throw runtime_error("unknown output format \"" + format + "\"");
527 }
528 }
529
530 int main(int argc, char **argv)
531 {
532 if (argc != 2 && argc != 3) {
533 usage();
534 }
535
536 bool debug = false;
537
538 string arg = argv[1];
539 if (arg == "-h") {
540 if (argc == 2) {
541 usage(true);
542 } else {
543 usage();
544 }
545 } else if (arg == "-v") {
546 if (argc == 2) {
547 version();
548 } else {
549 usage();
550 }
551 } else if (arg == "-d") {
552 if (argc == 2) {
553 usage();
554 } else {
555 debug = true;
556 arg = argv[2];
557 }
558 }
559
560 string format = arg;
561
562 if (format != "capnp" && format != "json") {
563 usage();
564 }
565
566 #ifdef _WIN32
567 if (format == "capnp") {
568 int result = _setmode(_fileno(stdin), _O_BINARY);
569 if (result == -1) {
570 cerr << "Failed to set binary mode on stdin, necessary for capnp format" << endl;
571 exit(1);
572 }
573 result = _setmode(_fileno(stdout), _O_BINARY);
574 if (result == -1) {
575 cerr << "Failed to set binary mode on stdout, necessary for capnp format" << endl;
576 exit(1);
577 }
578 }
579 #endif
580
581 if (debug) {
582 cerr << myname << " " << pid << ": waiting for format: " << format << endl;
583 }
584
585 while (true) {
586
587 RequestOrResponse request;
588
589 try {
590
591 request = readRequest(format);
592
593 // NotValid without an exception indicates EOF:
594 if (request.type == RRType::NotValid) {
595 if (debug) {
596 cerr << myname << " " << pid << ": eof reached, exiting" << endl;
597 }
598 break;
599 }
600
601 if (debug) {
602 cerr << myname << " " << pid << ": request received, of type "
603 << int(request.type)
604 << endl;
605 }
606
607 RequestOrResponse response = handleRequest(request, debug);
608 response.id = request.id;
609
610 if (debug) {
611 cerr << myname << " " << pid << ": request handled, writing response"
612 << endl;
613 }
614
615 writeResponse(format, response);
616
617 if (debug) {
618 cerr << myname << " " << pid << ": response written" << endl;
619 }
620
621 if (request.type == RRType::Finish) {
622 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
623 if (debug) {
624 cerr << myname << " " << pid << ": deleting the plugin with handle " << h << endl;
625 }
626 mapper.removePlugin(h);
627 delete request.finishRequest.plugin;
628 }
629
630 } catch (std::exception &e) {
631
632 if (debug) {
633 cerr << myname << " " << pid << ": error: " << e.what() << endl;
634 }
635
636 writeException(format, e, request.type);
637
638 if (format == "capnp") {
639 // Don't try to continue; we can't recover from a
640 // mangled input stream. However, we can return a
641 // successful error code because we are reporting the
642 // status in our Capnp output stream instead
643 if (debug) {
644 cerr << myname << " " << pid << ": not attempting to recover from capnp parse problems, exiting" << endl;
645 }
646 exit(0);
647 }
648 }
649 }
650
651 exit(0);
652 }