annotate utilities/vampipe-server.cpp @ 34:ba58fe5ee2dd

Check channel count & block size
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 25 May 2016 11:09:18 +0100
parents 0b48b10140bb
children 55d69b26d4db
rev   line source
c@31 1
c@31 2 #include "VampnProto.h"
c@31 3
c@31 4 #include "bits/RequestOrResponse.h"
c@31 5
c@31 6 #include <iostream>
c@31 7 #include <sstream>
c@31 8 #include <stdexcept>
c@31 9
c@32 10 #include <map>
c@32 11 #include <set>
c@32 12
c@31 13 using namespace std;
c@31 14 using namespace vampipe;
c@32 15 using namespace Vamp;
c@32 16 using namespace Vamp::HostExt;
c@31 17
c@31 18 void usage()
c@31 19 {
c@31 20 string myname = "vampipe-server";
c@31 21 cerr << "\n" << myname <<
c@31 22 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@31 23 " Usage: " << myname << "\n\n"
c@31 24 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
c@31 25 "and writes Vamp response messages in the same format to stdout.\n\n";
c@31 26
c@31 27 exit(2);
c@31 28 }
c@31 29
c@32 30 class Mapper : public PluginHandleMapper
c@32 31 {
c@32 32 public:
c@32 33 Mapper() : m_nextHandle(1) { }
c@32 34
c@32 35 void addPlugin(Plugin *p) {
c@32 36 if (m_rplugins.find(p) == m_rplugins.end()) {
c@32 37 int32_t h = m_nextHandle++;
c@32 38 m_plugins[h] = p;
c@32 39 m_rplugins[p] = h;
c@32 40 }
c@32 41 }
c@33 42
c@33 43 void removePlugin(int32_t h) {
c@33 44 if (m_plugins.find(h) == m_plugins.end()) {
c@33 45 throw NotFound();
c@33 46 }
c@33 47 Plugin *p = m_plugins[h];
c@33 48 m_plugins.erase(h);
c@34 49 if (isConfigured(h)) {
c@34 50 m_configuredPlugins.erase(h);
c@34 51 m_channelCounts.erase(h);
c@34 52 }
c@33 53 m_rplugins.erase(p);
c@33 54 }
c@32 55
c@32 56 int32_t pluginToHandle(Plugin *p) {
c@32 57 if (m_rplugins.find(p) == m_rplugins.end()) {
c@32 58 throw NotFound();
c@32 59 }
c@32 60 return m_rplugins[p];
c@32 61 }
c@32 62
c@32 63 Plugin *handleToPlugin(int32_t h) {
c@32 64 if (m_plugins.find(h) == m_plugins.end()) {
c@32 65 throw NotFound();
c@32 66 }
c@32 67 return m_plugins[h];
c@32 68 }
c@32 69
c@33 70 bool isConfigured(int32_t h) {
c@33 71 return m_configuredPlugins.find(h) != m_configuredPlugins.end();
c@32 72 }
c@32 73
c@34 74 void markConfigured(int32_t h, int channelCount, int blockSize) {
c@33 75 m_configuredPlugins.insert(h);
c@34 76 m_channelCounts[h] = channelCount;
c@34 77 m_blockSizes[h] = blockSize;
c@34 78 }
c@34 79
c@34 80 int getChannelCount(int32_t h) {
c@34 81 if (m_channelCounts.find(h) == m_channelCounts.end()) {
c@34 82 throw NotFound();
c@34 83 }
c@34 84 return m_channelCounts[h];
c@34 85 }
c@34 86
c@34 87 int getBlockSize(int32_t h) {
c@34 88 if (m_blockSizes.find(h) == m_blockSizes.end()) {
c@34 89 throw NotFound();
c@34 90 }
c@34 91 return m_blockSizes[h];
c@32 92 }
c@32 93
c@32 94 private:
c@32 95 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number
c@32 96 map<uint32_t, Plugin *> m_plugins;
c@32 97 map<Plugin *, uint32_t> m_rplugins;
c@33 98 set<uint32_t> m_configuredPlugins;
c@34 99 map<uint32_t, int> m_channelCounts;
c@34 100 map<uint32_t, int> m_blockSizes;
c@32 101 };
c@32 102
c@32 103 static Mapper mapper;
c@32 104
c@31 105 RequestOrResponse
c@31 106 readRequestCapnp()
c@31 107 {
c@31 108 RequestOrResponse rr;
c@31 109 rr.direction = RequestOrResponse::Request;
c@31 110
c@33 111 static kj::FdInputStream stream(0); // stdin
c@33 112 static kj::BufferedInputStreamWrapper buffered(stream);
c@33 113
c@33 114 if (buffered.tryGetReadBuffer() == nullptr) {
c@33 115 rr.type = RRType::NotValid;
c@33 116 return rr;
c@33 117 }
c@33 118
c@33 119 ::capnp::InputStreamMessageReader message(buffered);
c@31 120 VampRequest::Reader reader = message.getRoot<VampRequest>();
c@31 121
c@31 122 rr.type = VampnProto::getRequestResponseType(reader);
c@31 123
c@31 124 switch (rr.type) {
c@31 125
c@31 126 case RRType::List:
c@31 127 VampnProto::readVampRequest_List(reader); // type check only
c@31 128 break;
c@31 129 case RRType::Load:
c@31 130 VampnProto::readVampRequest_Load(rr.loadRequest, reader);
c@31 131 break;
c@31 132 case RRType::Configure:
c@32 133 VampnProto::readVampRequest_Configure(rr.configurationRequest,
c@32 134 reader, mapper);
c@31 135 break;
c@31 136 case RRType::Process:
c@32 137 VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper);
c@31 138 break;
c@31 139 case RRType::Finish:
c@32 140 VampnProto::readVampRequest_Finish(rr.finishPlugin, reader, mapper);
c@31 141 break;
c@31 142 case RRType::NotValid:
c@31 143 break;
c@31 144 }
c@31 145
c@31 146 return rr;
c@31 147 }
c@31 148
c@31 149 void
c@31 150 writeResponseCapnp(RequestOrResponse &rr)
c@31 151 {
c@31 152 ::capnp::MallocMessageBuilder message;
c@31 153 VampResponse::Builder builder = message.initRoot<VampResponse>();
c@31 154
c@31 155 switch (rr.type) {
c@31 156
c@31 157 case RRType::List:
c@31 158 VampnProto::buildVampResponse_List(builder, "", rr.listResponse);
c@31 159 break;
c@31 160 case RRType::Load:
c@32 161 VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper);
c@31 162 break;
c@31 163 case RRType::Configure:
c@31 164 VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse);
c@31 165 break;
c@31 166 case RRType::Process:
c@31 167 VampnProto::buildVampResponse_Process(builder, rr.processResponse);
c@31 168 break;
c@31 169 case RRType::Finish:
c@31 170 VampnProto::buildVampResponse_Finish(builder, rr.finishResponse);
c@31 171 break;
c@31 172 case RRType::NotValid:
c@31 173 break;
c@31 174 }
c@31 175
c@33 176 writeMessageToFd(1, message);
c@31 177 }
c@31 178
c@31 179 RequestOrResponse
c@34 180 handleRequest(const RequestOrResponse &request)
c@31 181 {
c@31 182 RequestOrResponse response;
c@31 183 response.direction = RequestOrResponse::Response;
c@32 184 response.type = request.type;
c@32 185
c@32 186 auto loader = PluginLoader::getInstance();
c@32 187
c@32 188 switch (request.type) {
c@32 189
c@32 190 case RRType::List:
c@32 191 response.listResponse = loader->listPluginData();
c@32 192 response.success = true;
c@32 193 break;
c@32 194
c@32 195 case RRType::Load:
c@32 196 response.loadResponse = loader->loadPlugin(request.loadRequest);
c@32 197 if (response.loadResponse.plugin != nullptr) {
c@32 198 mapper.addPlugin(response.loadResponse.plugin);
c@32 199 response.success = true;
c@32 200 }
c@32 201 break;
c@32 202
c@33 203 case RRType::Configure:
c@33 204 {
c@34 205 auto &creq = request.configurationRequest;
c@34 206 auto h = mapper.pluginToHandle(creq.plugin);
c@33 207 if (mapper.isConfigured(h)) {
c@33 208 throw runtime_error("plugin has already been configured");
c@33 209 }
c@33 210
c@34 211 response.configurationResponse = loader->configurePlugin(creq);
c@33 212
c@33 213 if (!response.configurationResponse.outputs.empty()) {
c@34 214 mapper.markConfigured
c@34 215 (h, creq.configuration.channelCount, creq.configuration.blockSize);
c@33 216 response.success = true;
c@33 217 }
c@33 218 break;
c@33 219 }
c@33 220
c@33 221 case RRType::Process:
c@33 222 {
c@33 223 auto &preq = request.processRequest;
c@34 224 auto h = mapper.pluginToHandle(preq.plugin);
c@34 225 if (!mapper.isConfigured(h)) {
c@34 226 throw runtime_error("plugin has not been configured");
c@34 227 }
c@34 228
c@33 229 int channels = int(preq.inputBuffers.size());
c@34 230 if (channels != mapper.getChannelCount(h)) {
c@34 231 throw runtime_error("wrong number of channels supplied to process");
c@34 232 }
c@34 233
c@33 234 const float **fbuffers = new const float *[channels];
c@33 235 for (int i = 0; i < channels; ++i) {
c@34 236 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
c@34 237 delete[] fbuffers;
c@34 238 throw runtime_error("wrong block size supplied to process");
c@34 239 }
c@33 240 fbuffers[i] = preq.inputBuffers[i].data();
c@33 241 }
c@33 242
c@33 243 response.processResponse.features =
c@33 244 preq.plugin->process(fbuffers, preq.timestamp);
c@33 245 response.success = true;
c@33 246
c@33 247 delete[] fbuffers;
c@33 248 break;
c@33 249 }
c@33 250
c@33 251 case RRType::Finish:
c@33 252 {
c@33 253 auto h = mapper.pluginToHandle(request.finishPlugin);
c@33 254
c@33 255 response.finishResponse.features =
c@33 256 request.finishPlugin->getRemainingFeatures();
c@33 257
c@33 258 mapper.removePlugin(h);
c@33 259 delete request.finishPlugin;
c@33 260 response.success = true;
c@33 261 break;
c@33 262 }
c@33 263
c@33 264 case RRType::NotValid:
c@33 265 break;
c@32 266 }
c@32 267
c@31 268 return response;
c@31 269 }
c@31 270
c@31 271 int main(int argc, char **argv)
c@31 272 {
c@31 273 if (argc != 1) {
c@31 274 usage();
c@31 275 }
c@31 276
c@31 277 while (true) {
c@31 278
c@31 279 try {
c@31 280
c@31 281 RequestOrResponse request = readRequestCapnp();
c@31 282
c@33 283 cerr << "vampipe-server: request received, of type "
c@33 284 << int(request.type)
c@33 285 << endl;
c@33 286
c@31 287 // NotValid without an exception indicates EOF:
c@33 288 if (request.type == RRType::NotValid) {
c@33 289 cerr << "vampipe-server: eof reached" << endl;
c@33 290 break;
c@33 291 }
c@31 292
c@34 293 RequestOrResponse response = handleRequest(request);
c@33 294
c@34 295 cerr << "vampipe-server: request handled, writing response"
c@33 296 << endl;
c@31 297
c@31 298 writeResponseCapnp(response);
c@33 299
c@33 300 cerr << "vampipe-server: response written" << endl;
c@31 301
c@31 302 } catch (std::exception &e) {
c@33 303 cerr << "vampipe-server: error: " << e.what() << endl;
c@31 304 exit(1);
c@31 305 }
c@31 306 }
c@31 307
c@31 308 exit(0);
c@31 309 }