annotate utilities/vampipe-server.cpp @ 66:6f160dee1192

Instead of using separate values and b64values entries in JSON serialisations, allow numeric arrays to be replaced by b64 variants wherever they appear (discriminating by type). Also rename values to featureValues in feature throughout, as values turns out to be a hazardous name in a JS context. Finally use Array instead of Text for array encoding (seems clearer).
author Chris Cannam <c.cannam@qmul.ac.uk>
date Tue, 27 Sep 2016 15:04:59 +0100
parents 815e94fedc1c
children a5ba837bca28
rev   line source
c@31 1
c@31 2 #include "VampnProto.h"
c@31 3
c@31 4 #include "bits/RequestOrResponse.h"
c@40 5 #include "bits/CountingPluginHandleMapper.h"
c@31 6
c@31 7 #include <iostream>
c@31 8 #include <sstream>
c@31 9 #include <stdexcept>
c@31 10
c@32 11 #include <map>
c@32 12 #include <set>
c@32 13
c@31 14 using namespace std;
c@31 15 using namespace vampipe;
c@32 16 using namespace Vamp;
c@32 17 using namespace Vamp::HostExt;
c@31 18
c@31 19 void usage()
c@31 20 {
c@31 21 string myname = "vampipe-server";
c@31 22 cerr << "\n" << myname <<
c@31 23 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@31 24 " Usage: " << myname << "\n\n"
c@31 25 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
c@31 26 "and writes Vamp response messages in the same format to stdout.\n\n";
c@31 27
c@31 28 exit(2);
c@31 29 }
c@31 30
c@40 31 static CountingPluginHandleMapper mapper;
c@32 32
c@31 33 RequestOrResponse
c@31 34 readRequestCapnp()
c@31 35 {
c@31 36 RequestOrResponse rr;
c@31 37 rr.direction = RequestOrResponse::Request;
c@31 38
c@33 39 static kj::FdInputStream stream(0); // stdin
c@33 40 static kj::BufferedInputStreamWrapper buffered(stream);
c@33 41
c@33 42 if (buffered.tryGetReadBuffer() == nullptr) {
c@33 43 rr.type = RRType::NotValid;
c@33 44 return rr;
c@33 45 }
c@33 46
c@33 47 ::capnp::InputStreamMessageReader message(buffered);
c@31 48 VampRequest::Reader reader = message.getRoot<VampRequest>();
c@31 49
c@31 50 rr.type = VampnProto::getRequestResponseType(reader);
c@31 51
c@31 52 switch (rr.type) {
c@31 53
c@31 54 case RRType::List:
c@31 55 VampnProto::readVampRequest_List(reader); // type check only
c@31 56 break;
c@31 57 case RRType::Load:
c@31 58 VampnProto::readVampRequest_Load(rr.loadRequest, reader);
c@31 59 break;
c@31 60 case RRType::Configure:
c@32 61 VampnProto::readVampRequest_Configure(rr.configurationRequest,
c@32 62 reader, mapper);
c@31 63 break;
c@31 64 case RRType::Process:
c@32 65 VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper);
c@31 66 break;
c@31 67 case RRType::Finish:
c@55 68 VampnProto::readVampRequest_Finish(rr.finishRequest, reader, mapper);
c@31 69 break;
c@31 70 case RRType::NotValid:
c@31 71 break;
c@31 72 }
c@31 73
c@31 74 return rr;
c@31 75 }
c@31 76
c@31 77 void
c@31 78 writeResponseCapnp(RequestOrResponse &rr)
c@31 79 {
c@31 80 ::capnp::MallocMessageBuilder message;
c@31 81 VampResponse::Builder builder = message.initRoot<VampResponse>();
c@31 82
c@52 83 if (!rr.success) {
c@31 84
c@52 85 VampnProto::buildVampResponse_Error(builder, rr.errorText, rr.type);
c@52 86
c@52 87 } else {
c@52 88
c@52 89 switch (rr.type) {
c@52 90
c@52 91 case RRType::List:
c@56 92 VampnProto::buildVampResponse_List(builder, rr.listResponse);
c@52 93 break;
c@52 94 case RRType::Load:
c@52 95 VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper);
c@52 96 break;
c@52 97 case RRType::Configure:
c@55 98 VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse, mapper);
c@52 99 break;
c@52 100 case RRType::Process:
c@52 101 VampnProto::buildVampResponse_Process(builder, rr.processResponse, mapper);
c@52 102 break;
c@52 103 case RRType::Finish:
c@52 104 VampnProto::buildVampResponse_Finish(builder, rr.finishResponse, mapper);
c@52 105 break;
c@52 106 case RRType::NotValid:
c@52 107 break;
c@52 108 }
c@31 109 }
c@52 110
c@52 111 writeMessageToFd(1, message);
c@52 112 }
c@31 113
c@52 114 void
c@52 115 writeExceptionCapnp(const std::exception &e, RRType type)
c@52 116 {
c@52 117 ::capnp::MallocMessageBuilder message;
c@52 118 VampResponse::Builder builder = message.initRoot<VampResponse>();
c@52 119 VampnProto::buildVampResponse_Exception(builder, e, type);
c@52 120
c@33 121 writeMessageToFd(1, message);
c@31 122 }
c@31 123
c@31 124 RequestOrResponse
c@34 125 handleRequest(const RequestOrResponse &request)
c@31 126 {
c@31 127 RequestOrResponse response;
c@31 128 response.direction = RequestOrResponse::Response;
c@32 129 response.type = request.type;
c@32 130
c@32 131 auto loader = PluginLoader::getInstance();
c@32 132
c@32 133 switch (request.type) {
c@32 134
c@32 135 case RRType::List:
c@32 136 response.listResponse = loader->listPluginData();
c@32 137 response.success = true;
c@32 138 break;
c@32 139
c@32 140 case RRType::Load:
c@32 141 response.loadResponse = loader->loadPlugin(request.loadRequest);
c@32 142 if (response.loadResponse.plugin != nullptr) {
c@32 143 mapper.addPlugin(response.loadResponse.plugin);
c@32 144 response.success = true;
c@32 145 }
c@32 146 break;
c@32 147
c@33 148 case RRType::Configure:
c@33 149 {
c@34 150 auto &creq = request.configurationRequest;
c@34 151 auto h = mapper.pluginToHandle(creq.plugin);
c@33 152 if (mapper.isConfigured(h)) {
c@33 153 throw runtime_error("plugin has already been configured");
c@33 154 }
c@33 155
c@34 156 response.configurationResponse = loader->configurePlugin(creq);
c@33 157
c@33 158 if (!response.configurationResponse.outputs.empty()) {
c@34 159 mapper.markConfigured
c@34 160 (h, creq.configuration.channelCount, creq.configuration.blockSize);
c@33 161 response.success = true;
c@33 162 }
c@33 163 break;
c@33 164 }
c@33 165
c@33 166 case RRType::Process:
c@33 167 {
c@33 168 auto &preq = request.processRequest;
c@34 169 auto h = mapper.pluginToHandle(preq.plugin);
c@34 170 if (!mapper.isConfigured(h)) {
c@34 171 throw runtime_error("plugin has not been configured");
c@34 172 }
c@34 173
c@33 174 int channels = int(preq.inputBuffers.size());
c@34 175 if (channels != mapper.getChannelCount(h)) {
c@34 176 throw runtime_error("wrong number of channels supplied to process");
c@34 177 }
c@34 178
c@33 179 const float **fbuffers = new const float *[channels];
c@33 180 for (int i = 0; i < channels; ++i) {
c@34 181 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
c@34 182 delete[] fbuffers;
c@34 183 throw runtime_error("wrong block size supplied to process");
c@34 184 }
c@33 185 fbuffers[i] = preq.inputBuffers[i].data();
c@33 186 }
c@33 187
c@52 188 response.processResponse.plugin = preq.plugin;
c@33 189 response.processResponse.features =
c@33 190 preq.plugin->process(fbuffers, preq.timestamp);
c@33 191 response.success = true;
c@33 192
c@33 193 delete[] fbuffers;
c@33 194 break;
c@33 195 }
c@33 196
c@33 197 case RRType::Finish:
c@33 198 {
c@55 199 response.finishResponse.plugin = request.finishRequest.plugin;
c@33 200 response.finishResponse.features =
c@55 201 request.finishRequest.plugin->getRemainingFeatures();
c@52 202
c@52 203 // We do not delete the plugin here -- we need it in the
c@52 204 // mapper when converting the features. It gets deleted by the
c@52 205 // caller.
c@52 206
c@33 207 response.success = true;
c@33 208 break;
c@33 209 }
c@33 210
c@33 211 case RRType::NotValid:
c@33 212 break;
c@32 213 }
c@32 214
c@31 215 return response;
c@31 216 }
c@31 217
c@31 218 int main(int argc, char **argv)
c@31 219 {
c@31 220 if (argc != 1) {
c@31 221 usage();
c@31 222 }
c@31 223
c@31 224 while (true) {
c@31 225
c@52 226 RequestOrResponse request;
c@52 227
c@31 228 try {
c@31 229
c@52 230 request = readRequestCapnp();
c@31 231
c@33 232 cerr << "vampipe-server: request received, of type "
c@33 233 << int(request.type)
c@33 234 << endl;
c@33 235
c@31 236 // NotValid without an exception indicates EOF:
c@33 237 if (request.type == RRType::NotValid) {
c@33 238 cerr << "vampipe-server: eof reached" << endl;
c@33 239 break;
c@33 240 }
c@31 241
c@34 242 RequestOrResponse response = handleRequest(request);
c@33 243
c@34 244 cerr << "vampipe-server: request handled, writing response"
c@33 245 << endl;
c@31 246
c@31 247 writeResponseCapnp(response);
c@33 248
c@33 249 cerr << "vampipe-server: response written" << endl;
c@52 250
c@52 251 if (request.type == RRType::Finish) {
c@55 252 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
c@52 253 mapper.removePlugin(h);
c@55 254 delete request.finishRequest.plugin;
c@52 255 }
c@31 256
c@31 257 } catch (std::exception &e) {
c@52 258
c@33 259 cerr << "vampipe-server: error: " << e.what() << endl;
c@52 260
c@52 261 writeExceptionCapnp(e, request.type);
c@52 262
c@31 263 exit(1);
c@31 264 }
c@31 265 }
c@31 266
c@31 267 exit(0);
c@31 268 }