annotate utilities/vampipe-server.cpp @ 49:f3f7561233d6

Begin plugin output id / index mapping for use in feature sets
author Chris Cannam <c.cannam@qmul.ac.uk>
date Fri, 16 Sep 2016 14:13:21 +0100
parents 55d69b26d4db
children f4244a2d55ac
rev   line source
c@31 1
c@31 2 #include "VampnProto.h"
c@31 3
c@31 4 #include "bits/RequestOrResponse.h"
c@40 5 #include "bits/CountingPluginHandleMapper.h"
c@31 6
c@31 7 #include <iostream>
c@31 8 #include <sstream>
c@31 9 #include <stdexcept>
c@31 10
c@32 11 #include <map>
c@32 12 #include <set>
c@32 13
c@31 14 using namespace std;
c@31 15 using namespace vampipe;
c@32 16 using namespace Vamp;
c@32 17 using namespace Vamp::HostExt;
c@31 18
c@31 19 void usage()
c@31 20 {
c@31 21 string myname = "vampipe-server";
c@31 22 cerr << "\n" << myname <<
c@31 23 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@31 24 " Usage: " << myname << "\n\n"
c@31 25 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
c@31 26 "and writes Vamp response messages in the same format to stdout.\n\n";
c@31 27
c@31 28 exit(2);
c@31 29 }
c@31 30
c@40 31 static CountingPluginHandleMapper mapper;
c@32 32
c@31 33 RequestOrResponse
c@31 34 readRequestCapnp()
c@31 35 {
c@31 36 RequestOrResponse rr;
c@31 37 rr.direction = RequestOrResponse::Request;
c@31 38
c@33 39 static kj::FdInputStream stream(0); // stdin
c@33 40 static kj::BufferedInputStreamWrapper buffered(stream);
c@33 41
c@33 42 if (buffered.tryGetReadBuffer() == nullptr) {
c@33 43 rr.type = RRType::NotValid;
c@33 44 return rr;
c@33 45 }
c@33 46
c@33 47 ::capnp::InputStreamMessageReader message(buffered);
c@31 48 VampRequest::Reader reader = message.getRoot<VampRequest>();
c@31 49
c@31 50 rr.type = VampnProto::getRequestResponseType(reader);
c@31 51
c@31 52 switch (rr.type) {
c@31 53
c@31 54 case RRType::List:
c@31 55 VampnProto::readVampRequest_List(reader); // type check only
c@31 56 break;
c@31 57 case RRType::Load:
c@31 58 VampnProto::readVampRequest_Load(rr.loadRequest, reader);
c@31 59 break;
c@31 60 case RRType::Configure:
c@32 61 VampnProto::readVampRequest_Configure(rr.configurationRequest,
c@32 62 reader, mapper);
c@31 63 break;
c@31 64 case RRType::Process:
c@32 65 VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper);
c@31 66 break;
c@31 67 case RRType::Finish:
c@32 68 VampnProto::readVampRequest_Finish(rr.finishPlugin, reader, mapper);
c@31 69 break;
c@31 70 case RRType::NotValid:
c@31 71 break;
c@31 72 }
c@31 73
c@31 74 return rr;
c@31 75 }
c@31 76
c@31 77 void
c@31 78 writeResponseCapnp(RequestOrResponse &rr)
c@31 79 {
c@31 80 ::capnp::MallocMessageBuilder message;
c@31 81 VampResponse::Builder builder = message.initRoot<VampResponse>();
c@31 82
c@31 83 switch (rr.type) {
c@31 84
c@31 85 case RRType::List:
c@31 86 VampnProto::buildVampResponse_List(builder, "", rr.listResponse);
c@31 87 break;
c@31 88 case RRType::Load:
c@32 89 VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper);
c@31 90 break;
c@31 91 case RRType::Configure:
c@31 92 VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse);
c@31 93 break;
c@31 94 case RRType::Process:
c@31 95 VampnProto::buildVampResponse_Process(builder, rr.processResponse);
c@31 96 break;
c@31 97 case RRType::Finish:
c@31 98 VampnProto::buildVampResponse_Finish(builder, rr.finishResponse);
c@31 99 break;
c@31 100 case RRType::NotValid:
c@31 101 break;
c@31 102 }
c@31 103
c@33 104 writeMessageToFd(1, message);
c@31 105 }
c@31 106
c@31 107 RequestOrResponse
c@34 108 handleRequest(const RequestOrResponse &request)
c@31 109 {
c@31 110 RequestOrResponse response;
c@31 111 response.direction = RequestOrResponse::Response;
c@32 112 response.type = request.type;
c@32 113
c@32 114 auto loader = PluginLoader::getInstance();
c@32 115
c@32 116 switch (request.type) {
c@32 117
c@32 118 case RRType::List:
c@32 119 response.listResponse = loader->listPluginData();
c@32 120 response.success = true;
c@32 121 break;
c@32 122
c@32 123 case RRType::Load:
c@32 124 response.loadResponse = loader->loadPlugin(request.loadRequest);
c@32 125 if (response.loadResponse.plugin != nullptr) {
c@32 126 mapper.addPlugin(response.loadResponse.plugin);
c@32 127 response.success = true;
c@32 128 }
c@32 129 break;
c@32 130
c@33 131 case RRType::Configure:
c@33 132 {
c@34 133 auto &creq = request.configurationRequest;
c@34 134 auto h = mapper.pluginToHandle(creq.plugin);
c@33 135 if (mapper.isConfigured(h)) {
c@33 136 throw runtime_error("plugin has already been configured");
c@33 137 }
c@33 138
c@34 139 response.configurationResponse = loader->configurePlugin(creq);
c@33 140
c@33 141 if (!response.configurationResponse.outputs.empty()) {
c@34 142 mapper.markConfigured
c@34 143 (h, creq.configuration.channelCount, creq.configuration.blockSize);
c@33 144 response.success = true;
c@33 145 }
c@33 146 break;
c@33 147 }
c@33 148
c@33 149 case RRType::Process:
c@33 150 {
c@33 151 auto &preq = request.processRequest;
c@34 152 auto h = mapper.pluginToHandle(preq.plugin);
c@34 153 if (!mapper.isConfigured(h)) {
c@34 154 throw runtime_error("plugin has not been configured");
c@34 155 }
c@34 156
c@33 157 int channels = int(preq.inputBuffers.size());
c@34 158 if (channels != mapper.getChannelCount(h)) {
c@34 159 throw runtime_error("wrong number of channels supplied to process");
c@34 160 }
c@34 161
c@33 162 const float **fbuffers = new const float *[channels];
c@33 163 for (int i = 0; i < channels; ++i) {
c@34 164 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
c@34 165 delete[] fbuffers;
c@34 166 throw runtime_error("wrong block size supplied to process");
c@34 167 }
c@33 168 fbuffers[i] = preq.inputBuffers[i].data();
c@33 169 }
c@33 170
c@33 171 response.processResponse.features =
c@33 172 preq.plugin->process(fbuffers, preq.timestamp);
c@33 173 response.success = true;
c@33 174
c@33 175 delete[] fbuffers;
c@33 176 break;
c@33 177 }
c@33 178
c@33 179 case RRType::Finish:
c@33 180 {
c@33 181 auto h = mapper.pluginToHandle(request.finishPlugin);
c@33 182
c@33 183 response.finishResponse.features =
c@33 184 request.finishPlugin->getRemainingFeatures();
c@33 185
c@33 186 mapper.removePlugin(h);
c@33 187 delete request.finishPlugin;
c@33 188 response.success = true;
c@33 189 break;
c@33 190 }
c@33 191
c@33 192 case RRType::NotValid:
c@33 193 break;
c@32 194 }
c@32 195
c@31 196 return response;
c@31 197 }
c@31 198
c@31 199 int main(int argc, char **argv)
c@31 200 {
c@31 201 if (argc != 1) {
c@31 202 usage();
c@31 203 }
c@31 204
c@31 205 while (true) {
c@31 206
c@31 207 try {
c@31 208
c@31 209 RequestOrResponse request = readRequestCapnp();
c@31 210
c@33 211 cerr << "vampipe-server: request received, of type "
c@33 212 << int(request.type)
c@33 213 << endl;
c@33 214
c@31 215 // NotValid without an exception indicates EOF:
c@33 216 if (request.type == RRType::NotValid) {
c@33 217 cerr << "vampipe-server: eof reached" << endl;
c@33 218 break;
c@33 219 }
c@31 220
c@34 221 RequestOrResponse response = handleRequest(request);
c@33 222
c@34 223 cerr << "vampipe-server: request handled, writing response"
c@33 224 << endl;
c@31 225
c@31 226 writeResponseCapnp(response);
c@33 227
c@33 228 cerr << "vampipe-server: response written" << endl;
c@31 229
c@31 230 } catch (std::exception &e) {
c@33 231 cerr << "vampipe-server: error: " << e.what() << endl;
c@31 232 exit(1);
c@31 233 }
c@31 234 }
c@31 235
c@31 236 exit(0);
c@31 237 }