annotate utilities/vampipe-server.cpp @ 33:0b48b10140bb

Switch to non-packed protocol and handle multiple messages and EOF properly; fill in remaining server actions
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 25 May 2016 10:43:07 +0100
parents 2d97883d20df
children ba58fe5ee2dd
rev   line source
c@31 1
c@31 2 #include "VampnProto.h"
c@31 3
c@31 4 #include "bits/RequestOrResponse.h"
c@31 5
c@31 6 #include <iostream>
c@31 7 #include <sstream>
c@31 8 #include <stdexcept>
c@31 9
c@32 10 #include <map>
c@32 11 #include <set>
c@32 12
c@31 13 using namespace std;
c@31 14 using namespace vampipe;
c@32 15 using namespace Vamp;
c@32 16 using namespace Vamp::HostExt;
c@31 17
c@31 18 void usage()
c@31 19 {
c@31 20 string myname = "vampipe-server";
c@31 21 cerr << "\n" << myname <<
c@31 22 ": Load and run Vamp plugins in response to messages from stdin\n\n"
c@31 23 " Usage: " << myname << "\n\n"
c@31 24 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
c@31 25 "and writes Vamp response messages in the same format to stdout.\n\n";
c@31 26
c@31 27 exit(2);
c@31 28 }
c@31 29
c@32 30 class Mapper : public PluginHandleMapper
c@32 31 {
c@32 32 public:
c@32 33 Mapper() : m_nextHandle(1) { }
c@32 34
c@32 35 void addPlugin(Plugin *p) {
c@32 36 if (m_rplugins.find(p) == m_rplugins.end()) {
c@32 37 int32_t h = m_nextHandle++;
c@32 38 m_plugins[h] = p;
c@32 39 m_rplugins[p] = h;
c@32 40 }
c@32 41 }
c@33 42
c@33 43 void removePlugin(int32_t h) {
c@33 44 if (m_plugins.find(h) == m_plugins.end()) {
c@33 45 throw NotFound();
c@33 46 }
c@33 47 Plugin *p = m_plugins[h];
c@33 48 m_plugins.erase(h);
c@33 49 m_rplugins.erase(p);
c@33 50 }
c@32 51
c@32 52 int32_t pluginToHandle(Plugin *p) {
c@32 53 if (m_rplugins.find(p) == m_rplugins.end()) {
c@32 54 throw NotFound();
c@32 55 }
c@32 56 return m_rplugins[p];
c@32 57 }
c@32 58
c@32 59 Plugin *handleToPlugin(int32_t h) {
c@32 60 if (m_plugins.find(h) == m_plugins.end()) {
c@32 61 throw NotFound();
c@32 62 }
c@32 63 return m_plugins[h];
c@32 64 }
c@32 65
c@33 66 bool isConfigured(int32_t h) {
c@33 67 return m_configuredPlugins.find(h) != m_configuredPlugins.end();
c@32 68 }
c@32 69
c@33 70 void markConfigured(int32_t h) {
c@33 71 m_configuredPlugins.insert(h);
c@32 72 }
c@32 73
c@32 74 private:
c@32 75 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number
c@32 76 map<uint32_t, Plugin *> m_plugins;
c@32 77 map<Plugin *, uint32_t> m_rplugins;
c@33 78 set<uint32_t> m_configuredPlugins;
c@32 79 };
c@32 80
c@32 81 static Mapper mapper;
c@32 82
c@31 83 RequestOrResponse
c@31 84 readRequestCapnp()
c@31 85 {
c@31 86 RequestOrResponse rr;
c@31 87 rr.direction = RequestOrResponse::Request;
c@31 88
c@33 89 static kj::FdInputStream stream(0); // stdin
c@33 90 static kj::BufferedInputStreamWrapper buffered(stream);
c@33 91
c@33 92 if (buffered.tryGetReadBuffer() == nullptr) {
c@33 93 rr.type = RRType::NotValid;
c@33 94 return rr;
c@33 95 }
c@33 96
c@33 97 ::capnp::InputStreamMessageReader message(buffered);
c@31 98 VampRequest::Reader reader = message.getRoot<VampRequest>();
c@31 99
c@31 100 rr.type = VampnProto::getRequestResponseType(reader);
c@31 101
c@31 102 switch (rr.type) {
c@31 103
c@31 104 case RRType::List:
c@31 105 VampnProto::readVampRequest_List(reader); // type check only
c@31 106 break;
c@31 107 case RRType::Load:
c@31 108 VampnProto::readVampRequest_Load(rr.loadRequest, reader);
c@31 109 break;
c@31 110 case RRType::Configure:
c@32 111 VampnProto::readVampRequest_Configure(rr.configurationRequest,
c@32 112 reader, mapper);
c@31 113 break;
c@31 114 case RRType::Process:
c@32 115 VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper);
c@31 116 break;
c@31 117 case RRType::Finish:
c@32 118 VampnProto::readVampRequest_Finish(rr.finishPlugin, reader, mapper);
c@31 119 break;
c@31 120 case RRType::NotValid:
c@31 121 break;
c@31 122 }
c@31 123
c@31 124 return rr;
c@31 125 }
c@31 126
c@31 127 void
c@31 128 writeResponseCapnp(RequestOrResponse &rr)
c@31 129 {
c@31 130 ::capnp::MallocMessageBuilder message;
c@31 131 VampResponse::Builder builder = message.initRoot<VampResponse>();
c@31 132
c@31 133 switch (rr.type) {
c@31 134
c@31 135 case RRType::List:
c@31 136 VampnProto::buildVampResponse_List(builder, "", rr.listResponse);
c@31 137 break;
c@31 138 case RRType::Load:
c@32 139 VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper);
c@31 140 break;
c@31 141 case RRType::Configure:
c@31 142 VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse);
c@31 143 break;
c@31 144 case RRType::Process:
c@31 145 VampnProto::buildVampResponse_Process(builder, rr.processResponse);
c@31 146 break;
c@31 147 case RRType::Finish:
c@31 148 VampnProto::buildVampResponse_Finish(builder, rr.finishResponse);
c@31 149 break;
c@31 150 case RRType::NotValid:
c@31 151 break;
c@31 152 }
c@31 153
c@33 154 writeMessageToFd(1, message);
c@31 155 }
c@31 156
c@31 157 RequestOrResponse
c@31 158 processRequest(const RequestOrResponse &request)
c@31 159 {
c@31 160 RequestOrResponse response;
c@31 161 response.direction = RequestOrResponse::Response;
c@32 162 response.type = request.type;
c@32 163
c@32 164 auto loader = PluginLoader::getInstance();
c@32 165
c@32 166 switch (request.type) {
c@32 167
c@32 168 case RRType::List:
c@32 169 response.listResponse = loader->listPluginData();
c@32 170 response.success = true;
c@32 171 break;
c@32 172
c@32 173 case RRType::Load:
c@32 174 response.loadResponse = loader->loadPlugin(request.loadRequest);
c@32 175 if (response.loadResponse.plugin != nullptr) {
c@32 176 mapper.addPlugin(response.loadResponse.plugin);
c@32 177 response.success = true;
c@32 178 }
c@32 179 break;
c@32 180
c@33 181 case RRType::Configure:
c@33 182 {
c@33 183 auto h = mapper.pluginToHandle(request.configurationRequest.plugin);
c@33 184 if (mapper.isConfigured(h)) {
c@33 185 throw runtime_error("plugin has already been configured");
c@33 186 }
c@33 187
c@33 188 response.configurationResponse =
c@33 189 loader->configurePlugin(request.configurationRequest);
c@33 190
c@33 191 if (!response.configurationResponse.outputs.empty()) {
c@33 192 mapper.markConfigured(h);
c@33 193 response.success = true;
c@33 194 }
c@33 195 break;
c@33 196 }
c@33 197
c@33 198 case RRType::Process:
c@33 199 {
c@33 200 auto &preq = request.processRequest;
c@33 201 int channels = int(preq.inputBuffers.size());
c@33 202 const float **fbuffers = new const float *[channels];
c@33 203 for (int i = 0; i < channels; ++i) {
c@33 204 fbuffers[i] = preq.inputBuffers[i].data();
c@33 205 }
c@33 206
c@33 207 response.processResponse.features =
c@33 208 preq.plugin->process(fbuffers, preq.timestamp);
c@33 209 response.success = true;
c@33 210
c@33 211 delete[] fbuffers;
c@33 212 break;
c@33 213 }
c@33 214
c@33 215 case RRType::Finish:
c@33 216 {
c@33 217 auto h = mapper.pluginToHandle(request.finishPlugin);
c@33 218
c@33 219 response.finishResponse.features =
c@33 220 request.finishPlugin->getRemainingFeatures();
c@33 221
c@33 222 mapper.removePlugin(h);
c@33 223 delete request.finishPlugin;
c@33 224 response.success = true;
c@33 225 break;
c@33 226 }
c@33 227
c@33 228 case RRType::NotValid:
c@33 229 break;
c@32 230 }
c@32 231
c@31 232 return response;
c@31 233 }
c@31 234
c@31 235 int main(int argc, char **argv)
c@31 236 {
c@31 237 if (argc != 1) {
c@31 238 usage();
c@31 239 }
c@31 240
c@31 241 while (true) {
c@31 242
c@31 243 try {
c@31 244
c@31 245 RequestOrResponse request = readRequestCapnp();
c@31 246
c@33 247 cerr << "vampipe-server: request received, of type "
c@33 248 << int(request.type)
c@33 249 << endl;
c@33 250
c@31 251 // NotValid without an exception indicates EOF:
c@33 252 if (request.type == RRType::NotValid) {
c@33 253 cerr << "vampipe-server: eof reached" << endl;
c@33 254 break;
c@33 255 }
c@31 256
c@31 257 RequestOrResponse response = processRequest(request);
c@33 258
c@33 259 cerr << "vampipe-server: request processed, writing response"
c@33 260 << endl;
c@31 261
c@31 262 writeResponseCapnp(response);
c@33 263
c@33 264 cerr << "vampipe-server: response written" << endl;
c@31 265
c@31 266 } catch (std::exception &e) {
c@33 267 cerr << "vampipe-server: error: " << e.what() << endl;
c@31 268 exit(1);
c@31 269 }
c@31 270 }
c@31 271
c@31 272 exit(0);
c@31 273 }