annotate vamp-server/simple-server.cpp @ 186:52322dde68ea

Fix erroneous logic for handling step and block size in prior commit The earlier change had a logical misconception. If PluginStub is receiving the correct step and block size back from the configure call, the plugin on the server side must have already been successfully initialised, as the step and block size are only returned in a successful configure response. This means the test for a failed initialise and redo with the correct parameters must be done on the server side (in LoaderRequests) not the client. The client has a more complicated job, which is to notice that a *successful* configure had returned different framing parameters from those passed to the initialise call, and to pretend that it had actually failed until the host called again with the correct parameters. We definitely need tests for this!
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 06 Feb 2017 16:44:33 +0000
parents 3eb00e5c76c4
children e0e3d9efa774
rev   line source
c@125 1 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*- vi:set ts=8 sts=4 sw=4: */
c@125 2 /*
c@125 3 Piper C++
c@125 4
c@125 5 An API for audio analysis and feature extraction plugins.
c@125 6
c@125 7 Centre for Digital Music, Queen Mary, University of London.
c@125 8 Copyright 2006-2016 Chris Cannam and QMUL.
c@125 9
c@125 10 Permission is hereby granted, free of charge, to any person
c@125 11 obtaining a copy of this software and associated documentation
c@125 12 files (the "Software"), to deal in the Software without
c@125 13 restriction, including without limitation the rights to use, copy,
c@125 14 modify, merge, publish, distribute, sublicense, and/or sell copies
c@125 15 of the Software, and to permit persons to whom the Software is
c@125 16 furnished to do so, subject to the following conditions:
c@125 17
c@125 18 The above copyright notice and this permission notice shall be
c@125 19 included in all copies or substantial portions of the Software.
c@125 20
c@125 21 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
c@125 22 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
c@125 23 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
c@125 24 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
c@125 25 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
c@125 26 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
c@125 27 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
c@125 28
c@125 29 Except as contained in this notice, the names of the Centre for
c@125 30 Digital Music; Queen Mary, University of London; and Chris Cannam
c@125 31 shall not be used in advertising or otherwise to promote the sale,
c@125 32 use or other dealings in this Software without prior written
c@125 33 authorization.
c@125 34 */
c@125 35
c@125 36 #include "vamp-json/VampJson.h"
c@125 37 #include "vamp-capnp/VampnProto.h"
c@125 38 #include "vamp-support/RequestOrResponse.h"
c@125 39 #include "vamp-support/CountingPluginHandleMapper.h"
c@125 40 #include "vamp-support/LoaderRequests.h"
c@125 41
c@125 42 #include <iostream>
c@125 43 #include <sstream>
c@125 44 #include <stdexcept>
c@125 45
c@125 46 #include <capnp/serialize.h>
c@125 47
c@125 48 #include <map>
c@125 49 #include <set>
c@125 50
c@125 51 // pid for logging
c@125 52 #ifdef _WIN32
c@125 53 #include <process.h>
c@125 54 static int pid = _getpid();
c@125 55 #else
c@125 56 #include <unistd.h>
c@125 57 static int pid = getpid();
c@125 58 #endif
c@125 59
c@138 60 // for _setmode stuff and _dup
c@125 61 #ifdef _WIN32
c@125 62 #include <io.h>
c@125 63 #include <fcntl.h>
c@125 64 #endif
c@125 65
c@138 66 // for dup, open etc
c@138 67 #ifndef _WIN32
c@138 68 #include <fcntl.h>
c@138 69 #include <unistd.h>
c@138 70 #endif
c@138 71
c@125 72 using namespace std;
c@125 73 using namespace json11;
c@125 74 using namespace piper_vamp;
c@125 75 using namespace Vamp;
c@125 76
c@125 77 static string myname = "piper-vamp-simple-server";
c@125 78
c@125 79 static void version()
c@125 80 {
c@125 81 cout << "1.0" << endl;
c@125 82 exit(0);
c@125 83 }
c@125 84
c@125 85 static void usage(bool successful = false)
c@125 86 {
c@125 87 cerr << "\n" << myname <<
c@125 88 ": Load & run Vamp plugins in response to Piper messages\n\n"
c@125 89 " Usage: " << myname << " [-d] <format>\n"
c@125 90 " " << myname << " -v\n"
c@125 91 " " << myname << " -h\n\n"
c@125 92 " where\n"
c@125 93 " <format>: the format to read and write messages in (\"json\" or \"capnp\")\n"
c@125 94 " -d: also print debug information to stderr\n"
c@125 95 " -v: print version number to stdout and exit\n"
c@125 96 " -h: print this text to stderr and exit\n\n"
c@125 97 "Expects Piper request messages in either Cap'n Proto or JSON format on stdin,\n"
c@125 98 "and writes response messages in the same format to stdout.\n\n"
c@125 99 "This server is intended for simple process separation. It's only suitable for\n"
c@125 100 "use with a single trusted client per server invocation.\n\n"
c@125 101 "The two formats behave differently in case of parser errors. JSON messages are\n"
c@125 102 "expected one per input line; because the JSON support is really intended for\n"
c@125 103 "interactive troubleshooting, any unparseable message is reported and discarded\n"
c@125 104 "and the server waits for another message. In contrast, because of the assumption\n"
c@125 105 "that the client is trusted and coupled to the server instance, a mangled\n"
c@125 106 "Cap'n Proto message causes the server to exit.\n\n";
c@125 107 if (successful) exit(0);
c@125 108 else exit(2);
c@125 109 }
c@125 110
c@125 111 static CountingPluginHandleMapper mapper;
c@125 112
c@138 113 // We write our output to stdout, but want to ensure that the plugin
c@138 114 // doesn't write anything itself. To do this we open a null file
c@138 115 // descriptor and dup2() it into place of stdout in the gaps between
c@138 116 // our own output activity.
c@138 117
c@138 118 static int normalFd = -1;
c@138 119 static int suspendedFd = -1;
c@138 120
c@138 121 static void initFds(bool binary)
c@138 122 {
c@138 123 #ifdef _WIN32
c@138 124 if (binary) {
c@138 125 int result = _setmode(0, _O_BINARY);
c@138 126 if (result == -1) {
c@138 127 throw runtime_error("Failed to set binary mode on stdin");
c@138 128 }
c@138 129 result = _setmode(1, _O_BINARY);
c@138 130 if (result == -1) {
c@138 131 throw runtime_error("Failed to set binary mode on stdout");
c@138 132 }
c@138 133 }
c@138 134 normalFd = _dup(1);
c@138 135 suspendedFd = _open("NUL", _O_WRONLY);
c@138 136 #else
c@141 137 (void)binary;
c@138 138 normalFd = dup(1);
c@138 139 suspendedFd = open("/dev/null", O_WRONLY);
c@138 140 #endif
c@138 141
c@138 142 if (normalFd < 0 || suspendedFd < 0) {
c@138 143 throw runtime_error("Failed to initialise fds for stdio suspend/resume");
c@138 144 }
c@138 145 }
c@138 146
c@138 147 static void suspendOutput()
c@138 148 {
c@138 149 #ifdef _WIN32
c@138 150 _dup2(suspendedFd, 1);
c@138 151 #else
c@138 152 dup2(suspendedFd, 1);
c@138 153 #endif
c@138 154 }
c@138 155
c@138 156 static void resumeOutput()
c@138 157 {
c@138 158 #ifdef _WIN32
c@138 159 _dup2(normalFd, 1);
c@138 160 #else
c@138 161 dup2(normalFd, 1);
c@138 162 #endif
c@138 163 }
c@138 164
c@125 165 static RequestOrResponse::RpcId
c@125 166 readId(const piper::RpcRequest::Reader &r)
c@125 167 {
c@125 168 int number;
c@125 169 string tag;
c@125 170 switch (r.getId().which()) {
c@125 171 case piper::RpcRequest::Id::Which::NUMBER:
c@125 172 number = r.getId().getNumber();
c@125 173 return { RequestOrResponse::RpcId::Number, number, "" };
c@125 174 case piper::RpcRequest::Id::Which::TAG:
c@125 175 tag = r.getId().getTag();
c@125 176 return { RequestOrResponse::RpcId::Tag, 0, tag };
c@125 177 case piper::RpcRequest::Id::Which::NONE:
c@125 178 return { RequestOrResponse::RpcId::Absent, 0, "" };
c@125 179 }
cannam@154 180 return { RequestOrResponse::RpcId::Absent, 0, "" };
c@125 181 }
c@125 182
c@125 183 static void
c@125 184 buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
c@125 185 {
c@125 186 switch (id.type) {
c@125 187 case RequestOrResponse::RpcId::Number:
c@125 188 b.getId().setNumber(id.number);
c@125 189 break;
c@125 190 case RequestOrResponse::RpcId::Tag:
c@125 191 b.getId().setTag(id.tag);
c@125 192 break;
c@125 193 case RequestOrResponse::RpcId::Absent:
c@125 194 b.getId().setNone();
c@125 195 break;
c@125 196 }
c@125 197 }
c@125 198
c@125 199 static RequestOrResponse::RpcId
c@125 200 readJsonId(const Json &j)
c@125 201 {
c@125 202 RequestOrResponse::RpcId id;
c@125 203
c@125 204 if (j["id"].is_number()) {
c@125 205 id.type = RequestOrResponse::RpcId::Number;
c@140 206 id.number = int(round(j["id"].number_value()));
c@125 207 } else if (j["id"].is_string()) {
c@125 208 id.type = RequestOrResponse::RpcId::Tag;
c@125 209 id.tag = j["id"].string_value();
c@125 210 } else {
c@125 211 id.type = RequestOrResponse::RpcId::Absent;
c@125 212 }
c@125 213
c@125 214 return id;
c@125 215 }
c@125 216
c@125 217 static Json
c@125 218 writeJsonId(const RequestOrResponse::RpcId &id)
c@125 219 {
c@125 220 if (id.type == RequestOrResponse::RpcId::Number) {
c@125 221 return id.number;
c@125 222 } else if (id.type == RequestOrResponse::RpcId::Tag) {
c@125 223 return id.tag;
c@125 224 } else {
c@125 225 return Json();
c@125 226 }
c@125 227 }
c@125 228
c@125 229 static Json
c@125 230 convertRequestJson(string input, string &err)
c@125 231 {
c@125 232 Json j = Json::parse(input, err);
c@125 233 if (err != "") {
c@125 234 err = "invalid json: " + err;
c@125 235 return {};
c@125 236 }
c@125 237 if (!j.is_object()) {
c@125 238 err = "object expected at top level";
c@125 239 } else if (!j["method"].is_string()) {
c@125 240 err = "string expected for method field";
c@125 241 } else if (!j["params"].is_null() && !j["params"].is_object()) {
c@125 242 err = "object expected for params field";
c@125 243 }
c@125 244 return j;
c@125 245 }
c@125 246
c@125 247 RequestOrResponse
cannam@158 248 readRequestJson(string &err, bool &eof)
c@125 249 {
c@125 250 RequestOrResponse rr;
c@125 251 rr.direction = RequestOrResponse::Request;
c@125 252
c@125 253 string input;
c@125 254 if (!getline(cin, input)) {
c@125 255 // the EOF case, not actually an error
cannam@158 256 eof = true;
c@125 257 return rr;
c@125 258 }
c@125 259
c@125 260 Json j = convertRequestJson(input, err);
c@125 261 if (err != "") return {};
c@125 262
c@125 263 rr.type = VampJson::getRequestResponseType(j, err);
c@125 264 if (err != "") return {};
c@125 265
c@125 266 rr.id = readJsonId(j);
c@125 267
c@125 268 VampJson::BufferSerialisation serialisation =
c@125 269 VampJson::BufferSerialisation::Array;
c@125 270
c@125 271 switch (rr.type) {
c@125 272
c@125 273 case RRType::List:
c@130 274 rr.listRequest = VampJson::toRpcRequest_List(j, err);
c@125 275 break;
c@125 276 case RRType::Load:
c@125 277 rr.loadRequest = VampJson::toRpcRequest_Load(j, err);
c@125 278 break;
c@125 279 case RRType::Configure:
c@125 280 rr.configurationRequest = VampJson::toRpcRequest_Configure(j, mapper, err);
c@125 281 break;
c@125 282 case RRType::Process:
c@125 283 rr.processRequest = VampJson::toRpcRequest_Process(j, mapper, serialisation, err);
c@125 284 break;
c@125 285 case RRType::Finish:
c@125 286 rr.finishRequest = VampJson::toRpcRequest_Finish(j, mapper, err);
c@125 287 break;
c@125 288 case RRType::NotValid:
c@125 289 break;
c@125 290 }
c@125 291
c@125 292 return rr;
c@125 293 }
c@125 294
c@125 295 void
c@125 296 writeResponseJson(RequestOrResponse &rr, bool useBase64)
c@125 297 {
c@125 298 Json j;
c@125 299
c@125 300 VampJson::BufferSerialisation serialisation =
c@125 301 (useBase64 ?
c@125 302 VampJson::BufferSerialisation::Base64 :
c@125 303 VampJson::BufferSerialisation::Array);
c@125 304
c@125 305 Json id = writeJsonId(rr.id);
c@125 306
c@125 307 if (!rr.success) {
c@125 308
c@125 309 j = VampJson::fromError(rr.errorText, rr.type, id);
c@125 310
c@125 311 } else {
c@125 312
c@125 313 switch (rr.type) {
c@125 314
c@125 315 case RRType::List:
c@125 316 j = VampJson::fromRpcResponse_List(rr.listResponse, id);
c@125 317 break;
c@125 318 case RRType::Load:
c@125 319 j = VampJson::fromRpcResponse_Load(rr.loadResponse, mapper, id);
c@125 320 break;
c@125 321 case RRType::Configure:
c@125 322 j = VampJson::fromRpcResponse_Configure(rr.configurationResponse,
c@125 323 mapper, id);
c@125 324 break;
c@125 325 case RRType::Process:
c@125 326 j = VampJson::fromRpcResponse_Process
c@125 327 (rr.processResponse, mapper, serialisation, id);
c@125 328 break;
c@125 329 case RRType::Finish:
c@125 330 j = VampJson::fromRpcResponse_Finish
c@125 331 (rr.finishResponse, mapper, serialisation, id);
c@125 332 break;
c@125 333 case RRType::NotValid:
c@125 334 break;
c@125 335 }
c@125 336 }
c@138 337
c@125 338 cout << j.dump() << endl;
c@125 339 }
c@125 340
c@125 341 void
cannam@158 342 writeExceptionJson(const exception &e, RRType type, RequestOrResponse::RpcId id)
c@125 343 {
cannam@158 344 Json jid = writeJsonId(id);
cannam@158 345 Json j = VampJson::fromError(e.what(), type, jid);
c@125 346 cout << j.dump() << endl;
c@125 347 }
c@125 348
c@125 349 RequestOrResponse
cannam@158 350 readRequestCapnp(bool &eof)
c@125 351 {
c@125 352 RequestOrResponse rr;
c@125 353 rr.direction = RequestOrResponse::Request;
c@125 354
c@125 355 static kj::FdInputStream stream(0); // stdin
c@125 356 static kj::BufferedInputStreamWrapper buffered(stream);
c@125 357
c@125 358 if (buffered.tryGetReadBuffer() == nullptr) {
cannam@158 359 eof = true;
c@125 360 return rr;
c@125 361 }
c@125 362
c@125 363 capnp::InputStreamMessageReader message(buffered);
c@125 364 piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>();
c@125 365
c@125 366 rr.type = VampnProto::getRequestResponseType(reader);
c@125 367 rr.id = readId(reader);
c@125 368
c@125 369 switch (rr.type) {
c@125 370
c@125 371 case RRType::List:
c@127 372 VampnProto::readRpcRequest_List(rr.listRequest, reader);
c@125 373 break;
c@125 374 case RRType::Load:
c@125 375 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
c@125 376 break;
c@125 377 case RRType::Configure:
c@125 378 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
c@125 379 reader, mapper);
c@125 380 break;
c@125 381 case RRType::Process:
c@125 382 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
c@125 383 break;
c@125 384 case RRType::Finish:
c@125 385 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
c@125 386 break;
c@125 387 case RRType::NotValid:
c@125 388 break;
c@125 389 }
c@125 390
c@125 391 return rr;
c@125 392 }
c@125 393
c@125 394 void
c@125 395 writeResponseCapnp(RequestOrResponse &rr)
c@125 396 {
c@125 397 capnp::MallocMessageBuilder message;
c@125 398 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
c@125 399
c@125 400 buildId(builder, rr.id);
c@125 401
c@125 402 if (!rr.success) {
c@125 403
c@125 404 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
c@125 405
c@125 406 } else {
c@125 407
c@125 408 switch (rr.type) {
c@125 409
c@125 410 case RRType::List:
c@125 411 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
c@125 412 break;
c@125 413 case RRType::Load:
c@125 414 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
c@125 415 break;
c@125 416 case RRType::Configure:
c@125 417 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
c@125 418 break;
c@125 419 case RRType::Process:
c@125 420 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
c@125 421 break;
c@125 422 case RRType::Finish:
c@125 423 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
c@125 424 break;
c@125 425 case RRType::NotValid:
c@125 426 break;
c@125 427 }
c@125 428 }
c@125 429
c@125 430 writeMessageToFd(1, message);
c@125 431 }
c@125 432
c@125 433 void
cannam@158 434 writeExceptionCapnp(const exception &e, RRType type, RequestOrResponse::RpcId id)
c@125 435 {
c@125 436 capnp::MallocMessageBuilder message;
c@125 437 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
cannam@158 438
cannam@158 439 buildId(builder, id);
c@125 440 VampnProto::buildRpcResponse_Exception(builder, e, type);
c@125 441
c@125 442 writeMessageToFd(1, message);
c@125 443 }
c@125 444
c@125 445 RequestOrResponse
c@125 446 handleRequest(const RequestOrResponse &request, bool debug)
c@125 447 {
c@125 448 RequestOrResponse response;
c@125 449 response.direction = RequestOrResponse::Response;
c@125 450 response.type = request.type;
c@125 451
c@125 452 switch (request.type) {
c@125 453
c@125 454 case RRType::List:
c@127 455 response.listResponse =
c@127 456 LoaderRequests().listPluginData(request.listRequest);
c@125 457 response.success = true;
c@125 458 break;
c@125 459
c@125 460 case RRType::Load:
c@127 461 response.loadResponse =
c@127 462 LoaderRequests().loadPlugin(request.loadRequest);
c@125 463 if (response.loadResponse.plugin != nullptr) {
c@125 464 mapper.addPlugin(response.loadResponse.plugin);
c@125 465 if (debug) {
c@127 466 cerr << "piper-vamp-server " << pid
c@127 467 << ": loaded plugin, handle = "
c@127 468 << mapper.pluginToHandle(response.loadResponse.plugin)
c@127 469 << endl;
c@125 470 }
c@125 471 response.success = true;
c@125 472 }
c@125 473 break;
c@125 474
c@125 475 case RRType::Configure:
c@125 476 {
c@125 477 auto &creq = request.configurationRequest;
cannam@158 478 if (!creq.plugin) {
cannam@158 479 throw runtime_error("unknown plugin handle supplied to configure");
cannam@158 480 }
cannam@158 481
c@125 482 auto h = mapper.pluginToHandle(creq.plugin);
c@125 483 if (mapper.isConfigured(h)) {
c@125 484 throw runtime_error("plugin has already been configured");
c@125 485 }
c@125 486
cannam@185 487 if (creq.configuration.framing.stepSize == 0 ||
cannam@185 488 creq.configuration.framing.blockSize == 0) {
cannam@185 489 throw runtime_error("step and block size must be non-zero");
cannam@185 490 }
cannam@185 491
c@125 492 response.configurationResponse = LoaderRequests().configurePlugin(creq);
c@125 493
c@125 494 if (!response.configurationResponse.outputs.empty()) {
c@125 495 mapper.markConfigured
cannam@185 496 (h,
cannam@185 497 creq.configuration.channelCount,
cannam@186 498 response.configurationResponse.framing.blockSize);
c@125 499 response.success = true;
c@125 500 }
c@125 501 break;
c@125 502 }
c@125 503
c@125 504 case RRType::Process:
c@125 505 {
c@125 506 auto &preq = request.processRequest;
cannam@158 507 if (!preq.plugin) {
cannam@158 508 throw runtime_error("unknown plugin handle supplied to process");
cannam@158 509 }
cannam@158 510
c@125 511 auto h = mapper.pluginToHandle(preq.plugin);
c@125 512 if (!mapper.isConfigured(h)) {
c@125 513 throw runtime_error("plugin has not been configured");
c@125 514 }
c@125 515
c@125 516 int channels = int(preq.inputBuffers.size());
c@125 517 if (channels != mapper.getChannelCount(h)) {
c@125 518 throw runtime_error("wrong number of channels supplied to process");
c@125 519 }
c@125 520
c@125 521 const float **fbuffers = new const float *[channels];
c@125 522 for (int i = 0; i < channels; ++i) {
c@125 523 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
cannam@186 524 ostringstream os;
cannam@186 525 os << "wrong block size supplied to process ("
cannam@186 526 << preq.inputBuffers[i].size()
cannam@186 527 << ", expecting " << mapper.getBlockSize(h) << ")" << ends;
c@125 528 delete[] fbuffers;
cannam@186 529 throw runtime_error(os.str());
c@125 530 }
c@125 531 fbuffers[i] = preq.inputBuffers[i].data();
c@125 532 }
c@125 533
c@125 534 response.processResponse.plugin = preq.plugin;
c@125 535 response.processResponse.features =
c@125 536 preq.plugin->process(fbuffers, preq.timestamp);
c@125 537 response.success = true;
c@125 538
c@125 539 delete[] fbuffers;
c@125 540 break;
c@125 541 }
c@125 542
c@125 543 case RRType::Finish:
c@125 544 {
c@125 545 auto &freq = request.finishRequest;
cannam@158 546 if (!freq.plugin) {
cannam@158 547 throw runtime_error("unknown plugin handle supplied to finish");
cannam@158 548 }
cannam@158 549
c@125 550 response.finishResponse.plugin = freq.plugin;
c@125 551
c@125 552 auto h = mapper.pluginToHandle(freq.plugin);
c@125 553 // Finish can be called (to unload the plugin) even if the
c@125 554 // plugin has never been configured or used. But we want to
c@125 555 // make sure we call getRemainingFeatures only if we have
c@125 556 // actually configured the plugin.
c@125 557 if (mapper.isConfigured(h)) {
c@125 558 response.finishResponse.features = freq.plugin->getRemainingFeatures();
c@125 559 }
c@125 560
c@125 561 // We do not delete the plugin here -- we need it in the
c@125 562 // mapper when converting the features. It gets deleted in the
c@125 563 // calling function.
c@125 564 response.success = true;
c@125 565 break;
c@125 566 }
c@125 567
c@125 568 case RRType::NotValid:
c@125 569 break;
c@125 570 }
c@125 571
c@125 572 return response;
c@125 573 }
c@125 574
c@125 575 RequestOrResponse
cannam@158 576 readRequest(string format, bool &eof)
c@125 577 {
c@125 578 if (format == "capnp") {
cannam@158 579 return readRequestCapnp(eof);
c@125 580 } else if (format == "json") {
c@125 581 string err;
cannam@158 582 auto result = readRequestJson(err, eof);
c@125 583 if (err != "") throw runtime_error(err);
c@125 584 else return result;
c@125 585 } else {
c@125 586 throw runtime_error("unknown input format \"" + format + "\"");
c@125 587 }
c@125 588 }
c@125 589
c@125 590 void
c@125 591 writeResponse(string format, RequestOrResponse &rr)
c@125 592 {
c@138 593 resumeOutput();
c@125 594 if (format == "capnp") {
c@125 595 writeResponseCapnp(rr);
c@125 596 } else if (format == "json") {
c@125 597 writeResponseJson(rr, false);
c@125 598 } else {
c@125 599 throw runtime_error("unknown output format \"" + format + "\"");
c@125 600 }
c@138 601 suspendOutput();
c@125 602 }
c@125 603
c@125 604 void
cannam@158 605 writeException(string format, const exception &e, RRType type, RequestOrResponse::RpcId id)
c@125 606 {
c@138 607 resumeOutput();
c@125 608 if (format == "capnp") {
cannam@158 609 writeExceptionCapnp(e, type, id);
c@125 610 } else if (format == "json") {
cannam@158 611 writeExceptionJson(e, type, id);
c@125 612 } else {
c@125 613 throw runtime_error("unknown output format \"" + format + "\"");
c@125 614 }
c@138 615 suspendOutput();
c@125 616 }
c@125 617
c@125 618 int main(int argc, char **argv)
c@125 619 {
c@125 620 if (argc != 2 && argc != 3) {
c@125 621 usage();
c@125 622 }
c@125 623
c@125 624 bool debug = false;
c@125 625
c@125 626 string arg = argv[1];
c@125 627 if (arg == "-h") {
c@125 628 if (argc == 2) {
c@125 629 usage(true);
c@125 630 } else {
c@125 631 usage();
c@125 632 }
c@125 633 } else if (arg == "-v") {
c@125 634 if (argc == 2) {
c@125 635 version();
c@125 636 } else {
c@125 637 usage();
c@125 638 }
c@125 639 } else if (arg == "-d") {
c@125 640 if (argc == 2) {
c@125 641 usage();
c@125 642 } else {
c@125 643 debug = true;
c@125 644 arg = argv[2];
c@125 645 }
c@125 646 }
c@125 647
c@125 648 string format = arg;
c@125 649
c@125 650 if (format != "capnp" && format != "json") {
c@125 651 usage();
c@125 652 }
c@125 653
c@138 654 try {
c@138 655 initFds(format == "capnp");
c@138 656 } catch (exception &e) {
c@138 657 cerr << "ERROR: " << e.what() << endl;
c@138 658 exit(1);
c@125 659 }
c@138 660
c@138 661 suspendOutput();
c@125 662
c@125 663 if (debug) {
c@125 664 cerr << myname << " " << pid << ": waiting for format: " << format << endl;
c@125 665 }
c@138 666
c@125 667 while (true) {
c@125 668
c@125 669 RequestOrResponse request;
c@125 670
c@125 671 try {
c@125 672
cannam@158 673 bool eof = false;
cannam@158 674 request = readRequest(format, eof);
c@125 675
cannam@158 676 if (eof) {
c@125 677 if (debug) {
c@125 678 cerr << myname << " " << pid << ": eof reached, exiting" << endl;
c@125 679 }
c@125 680 break;
c@125 681 }
c@125 682
c@125 683 if (debug) {
c@125 684 cerr << myname << " " << pid << ": request received, of type "
c@125 685 << int(request.type)
c@125 686 << endl;
c@125 687 }
cannam@158 688
cannam@158 689 } catch (exception &e) {
c@125 690
cannam@158 691 if (debug) {
cannam@158 692 cerr << myname << " " << pid << ": error: " << e.what() << endl;
cannam@158 693 }
cannam@158 694
cannam@158 695 writeException(format, e, request.type, request.id);
cannam@158 696
cannam@158 697 if (format == "capnp") {
cannam@158 698 // Don't try to continue; we can't recover from a
cannam@158 699 // mangled input stream. However, we can return a
cannam@158 700 // successful error code because we are reporting the
cannam@158 701 // status in our Capnp output stream instead
cannam@158 702 if (debug) {
cannam@158 703 cerr << myname << " " << pid << ": not attempting to recover from capnp parse problems, exiting" << endl;
cannam@158 704 }
cannam@158 705 exit(0);
cannam@158 706 }
cannam@158 707 }
cannam@158 708
cannam@158 709 try {
c@125 710 RequestOrResponse response = handleRequest(request, debug);
c@125 711 response.id = request.id;
c@125 712
c@125 713 if (debug) {
c@125 714 cerr << myname << " " << pid << ": request handled, writing response"
c@125 715 << endl;
c@125 716 }
c@125 717
c@125 718 writeResponse(format, response);
c@125 719
c@125 720 if (debug) {
c@125 721 cerr << myname << " " << pid << ": response written" << endl;
c@125 722 }
c@125 723
c@125 724 if (request.type == RRType::Finish) {
c@125 725 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
c@125 726 if (debug) {
c@125 727 cerr << myname << " " << pid << ": deleting the plugin with handle " << h << endl;
c@125 728 }
c@125 729 mapper.removePlugin(h);
c@125 730 delete request.finishRequest.plugin;
c@125 731 }
c@125 732
c@138 733 } catch (exception &e) {
c@125 734
c@125 735 if (debug) {
c@125 736 cerr << myname << " " << pid << ": error: " << e.what() << endl;
c@125 737 }
c@125 738
cannam@158 739 writeException(format, e, request.type, request.id);
c@125 740 }
c@125 741 }
c@125 742
c@125 743 exit(0);
c@125 744 }