annotate utilities/vampipe-server.cpp @ 74:d45cfa25aaad

Ensure code is at least set, but just to 0 at the mo
author Chris Cannam <c.cannam@qmul.ac.uk>
date Mon, 10 Oct 2016 15:31:44 +0100
parents 7bfc07576830
children
rev   line source
c@31 1
c@31 2 #include "VampnProto.h"
c@31 3
c@31 4 #include "bits/RequestOrResponse.h"
c@40 5 #include "bits/CountingPluginHandleMapper.h"
c@31 6
c@31 7 #include <iostream>
c@31 8 #include <sstream>
c@31 9 #include <stdexcept>
c@31 10
c@32 11 #include <map>
c@32 12 #include <set>
c@32 13
c@31 14 using namespace std;
c@31 15 using namespace vampipe;
c@32 16 using namespace Vamp;
c@32 17 using namespace Vamp::HostExt;
c@31 18
c@31 19 void usage()
c@31 20 {
c@31 21 string myname = "vampipe-server";
c@31 22 cerr << "\n" << myname <<
c@31 23 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@31 24 " Usage: " << myname << "\n\n"
c@31 25 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
c@31 26 "and writes Vamp response messages in the same format to stdout.\n\n";
c@31 27
c@31 28 exit(2);
c@31 29 }
c@31 30
c@40 31 static CountingPluginHandleMapper mapper;
c@32 32
c@73 33 static RequestOrResponse::RpcId readId(const RpcRequest::Reader &r)
c@73 34 {
c@73 35 int number;
c@73 36 string tag;
c@73 37 switch (r.getId().which()) {
c@73 38 case RpcRequest::Id::Which::NUMBER:
c@73 39 number = r.getId().getNumber();
c@73 40 return { RequestOrResponse::RpcId::Number, number, "" };
c@73 41 case RpcRequest::Id::Which::TAG:
c@73 42 tag = r.getId().getTag();
c@73 43 return { RequestOrResponse::RpcId::Tag, 0, tag };
c@73 44 case RpcRequest::Id::Which::NONE:
c@73 45 return { RequestOrResponse::RpcId::Absent, 0, "" };
c@73 46 }
c@73 47 return {};
c@73 48 }
c@73 49
c@73 50 static void buildId(RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
c@73 51 {
c@73 52 switch (id.type) {
c@73 53 case RequestOrResponse::RpcId::Number:
c@73 54 b.getId().setNumber(id.number);
c@73 55 break;
c@73 56 case RequestOrResponse::RpcId::Tag:
c@73 57 b.getId().setTag(id.tag);
c@73 58 break;
c@73 59 case RequestOrResponse::RpcId::Absent:
c@73 60 b.getId().setNone();
c@73 61 break;
c@73 62 }
c@73 63 }
c@73 64
c@31 65 RequestOrResponse
c@31 66 readRequestCapnp()
c@31 67 {
c@31 68 RequestOrResponse rr;
c@31 69 rr.direction = RequestOrResponse::Request;
c@31 70
c@33 71 static kj::FdInputStream stream(0); // stdin
c@33 72 static kj::BufferedInputStreamWrapper buffered(stream);
c@33 73
c@33 74 if (buffered.tryGetReadBuffer() == nullptr) {
c@33 75 rr.type = RRType::NotValid;
c@33 76 return rr;
c@33 77 }
c@33 78
c@33 79 ::capnp::InputStreamMessageReader message(buffered);
c@68 80 RpcRequest::Reader reader = message.getRoot<RpcRequest>();
c@31 81
c@31 82 rr.type = VampnProto::getRequestResponseType(reader);
c@73 83 rr.id = readId(reader);
c@31 84
c@31 85 switch (rr.type) {
c@31 86
c@31 87 case RRType::List:
c@68 88 VampnProto::readRpcRequest_List(reader); // type check only
c@31 89 break;
c@31 90 case RRType::Load:
c@68 91 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
c@31 92 break;
c@31 93 case RRType::Configure:
c@68 94 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
c@32 95 reader, mapper);
c@31 96 break;
c@31 97 case RRType::Process:
c@68 98 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
c@31 99 break;
c@31 100 case RRType::Finish:
c@68 101 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
c@31 102 break;
c@31 103 case RRType::NotValid:
c@31 104 break;
c@31 105 }
c@31 106
c@31 107 return rr;
c@31 108 }
c@31 109
c@31 110 void
c@31 111 writeResponseCapnp(RequestOrResponse &rr)
c@31 112 {
c@31 113 ::capnp::MallocMessageBuilder message;
c@68 114 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
c@31 115
c@73 116 buildId(builder, rr.id);
c@73 117
c@52 118 if (!rr.success) {
c@31 119
c@68 120 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
c@52 121
c@52 122 } else {
c@52 123
c@52 124 switch (rr.type) {
c@52 125
c@52 126 case RRType::List:
c@68 127 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
c@52 128 break;
c@52 129 case RRType::Load:
c@68 130 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
c@52 131 break;
c@52 132 case RRType::Configure:
c@68 133 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
c@52 134 break;
c@52 135 case RRType::Process:
c@68 136 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
c@52 137 break;
c@52 138 case RRType::Finish:
c@68 139 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
c@52 140 break;
c@52 141 case RRType::NotValid:
c@52 142 break;
c@52 143 }
c@31 144 }
c@52 145
c@52 146 writeMessageToFd(1, message);
c@52 147 }
c@31 148
c@52 149 void
c@52 150 writeExceptionCapnp(const std::exception &e, RRType type)
c@52 151 {
c@52 152 ::capnp::MallocMessageBuilder message;
c@68 153 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
c@68 154 VampnProto::buildRpcResponse_Exception(builder, e, type);
c@52 155
c@33 156 writeMessageToFd(1, message);
c@31 157 }
c@31 158
c@31 159 RequestOrResponse
c@34 160 handleRequest(const RequestOrResponse &request)
c@31 161 {
c@31 162 RequestOrResponse response;
c@31 163 response.direction = RequestOrResponse::Response;
c@32 164 response.type = request.type;
c@32 165
c@32 166 auto loader = PluginLoader::getInstance();
c@32 167
c@32 168 switch (request.type) {
c@32 169
c@32 170 case RRType::List:
c@32 171 response.listResponse = loader->listPluginData();
c@32 172 response.success = true;
c@32 173 break;
c@32 174
c@32 175 case RRType::Load:
c@32 176 response.loadResponse = loader->loadPlugin(request.loadRequest);
c@32 177 if (response.loadResponse.plugin != nullptr) {
c@32 178 mapper.addPlugin(response.loadResponse.plugin);
c@32 179 response.success = true;
c@32 180 }
c@32 181 break;
c@32 182
c@33 183 case RRType::Configure:
c@33 184 {
c@34 185 auto &creq = request.configurationRequest;
c@34 186 auto h = mapper.pluginToHandle(creq.plugin);
c@33 187 if (mapper.isConfigured(h)) {
c@33 188 throw runtime_error("plugin has already been configured");
c@33 189 }
c@33 190
c@34 191 response.configurationResponse = loader->configurePlugin(creq);
c@33 192
c@33 193 if (!response.configurationResponse.outputs.empty()) {
c@34 194 mapper.markConfigured
c@34 195 (h, creq.configuration.channelCount, creq.configuration.blockSize);
c@33 196 response.success = true;
c@33 197 }
c@33 198 break;
c@33 199 }
c@33 200
c@33 201 case RRType::Process:
c@33 202 {
c@33 203 auto &preq = request.processRequest;
c@34 204 auto h = mapper.pluginToHandle(preq.plugin);
c@34 205 if (!mapper.isConfigured(h)) {
c@34 206 throw runtime_error("plugin has not been configured");
c@34 207 }
c@34 208
c@33 209 int channels = int(preq.inputBuffers.size());
c@34 210 if (channels != mapper.getChannelCount(h)) {
c@34 211 throw runtime_error("wrong number of channels supplied to process");
c@34 212 }
c@34 213
c@33 214 const float **fbuffers = new const float *[channels];
c@33 215 for (int i = 0; i < channels; ++i) {
c@34 216 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
c@34 217 delete[] fbuffers;
c@34 218 throw runtime_error("wrong block size supplied to process");
c@34 219 }
c@33 220 fbuffers[i] = preq.inputBuffers[i].data();
c@33 221 }
c@33 222
c@52 223 response.processResponse.plugin = preq.plugin;
c@33 224 response.processResponse.features =
c@33 225 preq.plugin->process(fbuffers, preq.timestamp);
c@33 226 response.success = true;
c@33 227
c@33 228 delete[] fbuffers;
c@33 229 break;
c@33 230 }
c@33 231
c@33 232 case RRType::Finish:
c@33 233 {
c@55 234 response.finishResponse.plugin = request.finishRequest.plugin;
c@33 235 response.finishResponse.features =
c@55 236 request.finishRequest.plugin->getRemainingFeatures();
c@52 237
c@52 238 // We do not delete the plugin here -- we need it in the
c@52 239 // mapper when converting the features. It gets deleted by the
c@52 240 // caller.
c@52 241
c@33 242 response.success = true;
c@33 243 break;
c@33 244 }
c@33 245
c@33 246 case RRType::NotValid:
c@33 247 break;
c@32 248 }
c@32 249
c@31 250 return response;
c@31 251 }
c@31 252
c@31 253 int main(int argc, char **argv)
c@31 254 {
c@31 255 if (argc != 1) {
c@31 256 usage();
c@31 257 }
c@31 258
c@31 259 while (true) {
c@31 260
c@52 261 RequestOrResponse request;
c@52 262
c@31 263 try {
c@31 264
c@52 265 request = readRequestCapnp();
c@31 266
c@33 267 cerr << "vampipe-server: request received, of type "
c@33 268 << int(request.type)
c@33 269 << endl;
c@33 270
c@31 271 // NotValid without an exception indicates EOF:
c@33 272 if (request.type == RRType::NotValid) {
c@33 273 cerr << "vampipe-server: eof reached" << endl;
c@33 274 break;
c@33 275 }
c@31 276
c@34 277 RequestOrResponse response = handleRequest(request);
c@73 278 response.id = request.id;
c@33 279
c@34 280 cerr << "vampipe-server: request handled, writing response"
c@33 281 << endl;
c@31 282
c@31 283 writeResponseCapnp(response);
c@33 284
c@33 285 cerr << "vampipe-server: response written" << endl;
c@52 286
c@52 287 if (request.type == RRType::Finish) {
c@55 288 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
c@52 289 mapper.removePlugin(h);
c@55 290 delete request.finishRequest.plugin;
c@52 291 }
c@31 292
c@31 293 } catch (std::exception &e) {
c@52 294
c@33 295 cerr << "vampipe-server: error: " << e.what() << endl;
c@52 296
c@52 297 writeExceptionCapnp(e, request.type);
c@52 298
c@31 299 exit(1);
c@31 300 }
c@31 301 }
c@31 302
c@31 303 exit(0);
c@31 304 }