comparison vamp-server/server.cpp @ 75:81e1c48e97f9

Rearrange and rename to Piper C++ structure
author Chris Cannam <c.cannam@qmul.ac.uk>
date Mon, 10 Oct 2016 16:31:09 +0100
parents
children ac1e4634479b
comparison
equal deleted inserted replaced
74:d45cfa25aaad 75:81e1c48e97f9
1
2 #include "vamp-capnp/VampnProto.h"
3 #include "vamp-support/RequestOrResponse.h"
4 #include "vamp-support/CountingPluginHandleMapper.h"
5
6 #include <iostream>
7 #include <sstream>
8 #include <stdexcept>
9
10 #include <map>
11 #include <set>
12
13 using namespace std;
14 using namespace piper;
15 using namespace Vamp;
16 using namespace Vamp::HostExt;
17
18 void usage()
19 {
20 string myname = "piper-vamp-server";
21 cerr << "\n" << myname <<
22 ": Load and run Vamp plugins in response to messages from stdin\n\n"
23 " Usage: " << myname << "\n\n"
24 "Expects Piper request messages in Cap'n Proto packed format on stdin,\n"
25 "and writes Piper response messages in the same format to stdout.\n\n";
26
27 exit(2);
28 }
29
30 static CountingPluginHandleMapper mapper;
31
32 static RequestOrResponse::RpcId readId(const RpcRequest::Reader &r)
33 {
34 int number;
35 string tag;
36 switch (r.getId().which()) {
37 case RpcRequest::Id::Which::NUMBER:
38 number = r.getId().getNumber();
39 return { RequestOrResponse::RpcId::Number, number, "" };
40 case RpcRequest::Id::Which::TAG:
41 tag = r.getId().getTag();
42 return { RequestOrResponse::RpcId::Tag, 0, tag };
43 case RpcRequest::Id::Which::NONE:
44 return { RequestOrResponse::RpcId::Absent, 0, "" };
45 }
46 return {};
47 }
48
49 static void buildId(RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
50 {
51 switch (id.type) {
52 case RequestOrResponse::RpcId::Number:
53 b.getId().setNumber(id.number);
54 break;
55 case RequestOrResponse::RpcId::Tag:
56 b.getId().setTag(id.tag);
57 break;
58 case RequestOrResponse::RpcId::Absent:
59 b.getId().setNone();
60 break;
61 }
62 }
63
64 RequestOrResponse
65 readRequestCapnp()
66 {
67 RequestOrResponse rr;
68 rr.direction = RequestOrResponse::Request;
69
70 static kj::FdInputStream stream(0); // stdin
71 static kj::BufferedInputStreamWrapper buffered(stream);
72
73 if (buffered.tryGetReadBuffer() == nullptr) {
74 rr.type = RRType::NotValid;
75 return rr;
76 }
77
78 ::capnp::InputStreamMessageReader message(buffered);
79 RpcRequest::Reader reader = message.getRoot<RpcRequest>();
80
81 rr.type = VampnProto::getRequestResponseType(reader);
82 rr.id = readId(reader);
83
84 switch (rr.type) {
85
86 case RRType::List:
87 VampnProto::readRpcRequest_List(reader); // type check only
88 break;
89 case RRType::Load:
90 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
91 break;
92 case RRType::Configure:
93 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
94 reader, mapper);
95 break;
96 case RRType::Process:
97 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
98 break;
99 case RRType::Finish:
100 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
101 break;
102 case RRType::NotValid:
103 break;
104 }
105
106 return rr;
107 }
108
109 void
110 writeResponseCapnp(RequestOrResponse &rr)
111 {
112 ::capnp::MallocMessageBuilder message;
113 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
114
115 buildId(builder, rr.id);
116
117 if (!rr.success) {
118
119 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
120
121 } else {
122
123 switch (rr.type) {
124
125 case RRType::List:
126 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
127 break;
128 case RRType::Load:
129 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
130 break;
131 case RRType::Configure:
132 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
133 break;
134 case RRType::Process:
135 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
136 break;
137 case RRType::Finish:
138 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
139 break;
140 case RRType::NotValid:
141 break;
142 }
143 }
144
145 writeMessageToFd(1, message);
146 }
147
148 void
149 writeExceptionCapnp(const std::exception &e, RRType type)
150 {
151 ::capnp::MallocMessageBuilder message;
152 RpcResponse::Builder builder = message.initRoot<RpcResponse>();
153 VampnProto::buildRpcResponse_Exception(builder, e, type);
154
155 writeMessageToFd(1, message);
156 }
157
158 RequestOrResponse
159 handleRequest(const RequestOrResponse &request)
160 {
161 RequestOrResponse response;
162 response.direction = RequestOrResponse::Response;
163 response.type = request.type;
164
165 auto loader = PluginLoader::getInstance();
166
167 switch (request.type) {
168
169 case RRType::List:
170 response.listResponse = loader->listPluginData();
171 response.success = true;
172 break;
173
174 case RRType::Load:
175 response.loadResponse = loader->loadPlugin(request.loadRequest);
176 if (response.loadResponse.plugin != nullptr) {
177 mapper.addPlugin(response.loadResponse.plugin);
178 response.success = true;
179 }
180 break;
181
182 case RRType::Configure:
183 {
184 auto &creq = request.configurationRequest;
185 auto h = mapper.pluginToHandle(creq.plugin);
186 if (mapper.isConfigured(h)) {
187 throw runtime_error("plugin has already been configured");
188 }
189
190 response.configurationResponse = loader->configurePlugin(creq);
191
192 if (!response.configurationResponse.outputs.empty()) {
193 mapper.markConfigured
194 (h, creq.configuration.channelCount, creq.configuration.blockSize);
195 response.success = true;
196 }
197 break;
198 }
199
200 case RRType::Process:
201 {
202 auto &preq = request.processRequest;
203 auto h = mapper.pluginToHandle(preq.plugin);
204 if (!mapper.isConfigured(h)) {
205 throw runtime_error("plugin has not been configured");
206 }
207
208 int channels = int(preq.inputBuffers.size());
209 if (channels != mapper.getChannelCount(h)) {
210 throw runtime_error("wrong number of channels supplied to process");
211 }
212
213 const float **fbuffers = new const float *[channels];
214 for (int i = 0; i < channels; ++i) {
215 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
216 delete[] fbuffers;
217 throw runtime_error("wrong block size supplied to process");
218 }
219 fbuffers[i] = preq.inputBuffers[i].data();
220 }
221
222 response.processResponse.plugin = preq.plugin;
223 response.processResponse.features =
224 preq.plugin->process(fbuffers, preq.timestamp);
225 response.success = true;
226
227 delete[] fbuffers;
228 break;
229 }
230
231 case RRType::Finish:
232 {
233 response.finishResponse.plugin = request.finishRequest.plugin;
234 response.finishResponse.features =
235 request.finishRequest.plugin->getRemainingFeatures();
236
237 // We do not delete the plugin here -- we need it in the
238 // mapper when converting the features. It gets deleted by the
239 // caller.
240
241 response.success = true;
242 break;
243 }
244
245 case RRType::NotValid:
246 break;
247 }
248
249 return response;
250 }
251
252 int main(int argc, char **argv)
253 {
254 if (argc != 1) {
255 usage();
256 }
257
258 while (true) {
259
260 RequestOrResponse request;
261
262 try {
263
264 request = readRequestCapnp();
265
266 cerr << "piper-vamp-server: request received, of type "
267 << int(request.type)
268 << endl;
269
270 // NotValid without an exception indicates EOF:
271 if (request.type == RRType::NotValid) {
272 cerr << "piper-vamp-server: eof reached" << endl;
273 break;
274 }
275
276 RequestOrResponse response = handleRequest(request);
277 response.id = request.id;
278
279 cerr << "piper-vamp-server: request handled, writing response"
280 << endl;
281
282 writeResponseCapnp(response);
283
284 cerr << "piper-vamp-server: response written" << endl;
285
286 if (request.type == RRType::Finish) {
287 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
288 mapper.removePlugin(h);
289 delete request.finishRequest.plugin;
290 }
291
292 } catch (std::exception &e) {
293
294 cerr << "piper-vamp-server: error: " << e.what() << endl;
295
296 writeExceptionCapnp(e, request.type);
297
298 exit(1);
299 }
300 }
301
302 exit(0);
303 }