c@75
|
1
|
c@75
|
2 #include "vamp-capnp/VampnProto.h"
|
c@75
|
3 #include "vamp-support/RequestOrResponse.h"
|
c@75
|
4 #include "vamp-support/CountingPluginHandleMapper.h"
|
c@97
|
5 #include "vamp-support/LoaderRequests.h"
|
c@75
|
6
|
c@75
|
7 #include <iostream>
|
c@75
|
8 #include <sstream>
|
c@75
|
9 #include <stdexcept>
|
c@75
|
10
|
c@91
|
11 #include <capnp/serialize.h>
|
c@91
|
12
|
c@75
|
13 #include <map>
|
c@75
|
14 #include <set>
|
c@75
|
15
|
c@103
|
16 #include <unistd.h> // getpid for logging
|
c@103
|
17
|
c@75
|
18 using namespace std;
|
c@97
|
19 using namespace piper_vamp;
|
c@75
|
20 using namespace Vamp;
|
c@75
|
21
|
c@102
|
22 //!!! This could be faster and lighter:
|
c@102
|
23 // - Use Capnp structures directly rather than converting to vamp-support ones
|
c@102
|
24 // - Use Vamp C API (vamp.h) directly rather than converting to C++
|
c@102
|
25 //!!! Doing the above for process() and finish() alone would be a good start
|
c@102
|
26
|
c@103
|
27 static int pid = getpid();
|
c@103
|
28
|
c@75
|
29 void usage()
|
c@75
|
30 {
|
c@75
|
31 string myname = "piper-vamp-server";
|
c@75
|
32 cerr << "\n" << myname <<
|
c@75
|
33 ": Load and run Vamp plugins in response to messages from stdin\n\n"
|
c@75
|
34 " Usage: " << myname << "\n\n"
|
c@75
|
35 "Expects Piper request messages in Cap'n Proto packed format on stdin,\n"
|
c@75
|
36 "and writes Piper response messages in the same format to stdout.\n\n";
|
c@75
|
37
|
c@75
|
38 exit(2);
|
c@75
|
39 }
|
c@75
|
40
|
c@75
|
41 static CountingPluginHandleMapper mapper;
|
c@75
|
42
|
c@97
|
43 static RequestOrResponse::RpcId readId(const piper::RpcRequest::Reader &r)
|
c@75
|
44 {
|
c@75
|
45 int number;
|
c@75
|
46 string tag;
|
c@75
|
47 switch (r.getId().which()) {
|
c@97
|
48 case piper::RpcRequest::Id::Which::NUMBER:
|
c@75
|
49 number = r.getId().getNumber();
|
c@75
|
50 return { RequestOrResponse::RpcId::Number, number, "" };
|
c@97
|
51 case piper::RpcRequest::Id::Which::TAG:
|
c@75
|
52 tag = r.getId().getTag();
|
c@75
|
53 return { RequestOrResponse::RpcId::Tag, 0, tag };
|
c@97
|
54 case piper::RpcRequest::Id::Which::NONE:
|
c@75
|
55 return { RequestOrResponse::RpcId::Absent, 0, "" };
|
c@75
|
56 }
|
c@75
|
57 return {};
|
c@75
|
58 }
|
c@75
|
59
|
c@97
|
60 static void buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
|
c@75
|
61 {
|
c@75
|
62 switch (id.type) {
|
c@75
|
63 case RequestOrResponse::RpcId::Number:
|
c@75
|
64 b.getId().setNumber(id.number);
|
c@75
|
65 break;
|
c@75
|
66 case RequestOrResponse::RpcId::Tag:
|
c@75
|
67 b.getId().setTag(id.tag);
|
c@75
|
68 break;
|
c@75
|
69 case RequestOrResponse::RpcId::Absent:
|
c@75
|
70 b.getId().setNone();
|
c@75
|
71 break;
|
c@75
|
72 }
|
c@75
|
73 }
|
c@75
|
74
|
c@75
|
75 RequestOrResponse
|
c@75
|
76 readRequestCapnp()
|
c@75
|
77 {
|
c@75
|
78 RequestOrResponse rr;
|
c@75
|
79 rr.direction = RequestOrResponse::Request;
|
c@75
|
80
|
c@75
|
81 static kj::FdInputStream stream(0); // stdin
|
c@75
|
82 static kj::BufferedInputStreamWrapper buffered(stream);
|
c@75
|
83
|
c@75
|
84 if (buffered.tryGetReadBuffer() == nullptr) {
|
c@75
|
85 rr.type = RRType::NotValid;
|
c@75
|
86 return rr;
|
c@75
|
87 }
|
c@75
|
88
|
c@97
|
89 capnp::InputStreamMessageReader message(buffered);
|
c@97
|
90 piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>();
|
c@75
|
91
|
c@75
|
92 rr.type = VampnProto::getRequestResponseType(reader);
|
c@75
|
93 rr.id = readId(reader);
|
c@75
|
94
|
c@75
|
95 switch (rr.type) {
|
c@75
|
96
|
c@75
|
97 case RRType::List:
|
c@75
|
98 VampnProto::readRpcRequest_List(reader); // type check only
|
c@75
|
99 break;
|
c@75
|
100 case RRType::Load:
|
c@75
|
101 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
|
c@75
|
102 break;
|
c@75
|
103 case RRType::Configure:
|
c@75
|
104 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
|
c@75
|
105 reader, mapper);
|
c@75
|
106 break;
|
c@75
|
107 case RRType::Process:
|
c@75
|
108 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
|
c@75
|
109 break;
|
c@75
|
110 case RRType::Finish:
|
c@75
|
111 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
|
c@75
|
112 break;
|
c@75
|
113 case RRType::NotValid:
|
c@75
|
114 break;
|
c@75
|
115 }
|
c@75
|
116
|
c@75
|
117 return rr;
|
c@75
|
118 }
|
c@75
|
119
|
c@75
|
120 void
|
c@75
|
121 writeResponseCapnp(RequestOrResponse &rr)
|
c@75
|
122 {
|
c@97
|
123 capnp::MallocMessageBuilder message;
|
c@97
|
124 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
|
c@75
|
125
|
c@75
|
126 buildId(builder, rr.id);
|
c@75
|
127
|
c@75
|
128 if (!rr.success) {
|
c@75
|
129
|
c@75
|
130 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
|
c@75
|
131
|
c@75
|
132 } else {
|
c@75
|
133
|
c@75
|
134 switch (rr.type) {
|
c@75
|
135
|
c@75
|
136 case RRType::List:
|
c@75
|
137 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
|
c@75
|
138 break;
|
c@75
|
139 case RRType::Load:
|
c@75
|
140 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
|
c@75
|
141 break;
|
c@75
|
142 case RRType::Configure:
|
c@75
|
143 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
|
c@75
|
144 break;
|
c@75
|
145 case RRType::Process:
|
c@75
|
146 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
|
c@75
|
147 break;
|
c@75
|
148 case RRType::Finish:
|
c@75
|
149 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
|
c@75
|
150 break;
|
c@75
|
151 case RRType::NotValid:
|
c@75
|
152 break;
|
c@75
|
153 }
|
c@75
|
154 }
|
c@75
|
155
|
c@75
|
156 writeMessageToFd(1, message);
|
c@75
|
157 }
|
c@75
|
158
|
c@75
|
159 void
|
c@75
|
160 writeExceptionCapnp(const std::exception &e, RRType type)
|
c@75
|
161 {
|
c@97
|
162 capnp::MallocMessageBuilder message;
|
c@97
|
163 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
|
c@75
|
164 VampnProto::buildRpcResponse_Exception(builder, e, type);
|
c@75
|
165
|
c@75
|
166 writeMessageToFd(1, message);
|
c@75
|
167 }
|
c@75
|
168
|
c@75
|
169 RequestOrResponse
|
c@75
|
170 handleRequest(const RequestOrResponse &request)
|
c@75
|
171 {
|
c@75
|
172 RequestOrResponse response;
|
c@75
|
173 response.direction = RequestOrResponse::Response;
|
c@75
|
174 response.type = request.type;
|
c@75
|
175
|
c@75
|
176 switch (request.type) {
|
c@75
|
177
|
c@75
|
178 case RRType::List:
|
c@97
|
179 response.listResponse = LoaderRequests().listPluginData();
|
c@75
|
180 response.success = true;
|
c@75
|
181 break;
|
c@75
|
182
|
c@75
|
183 case RRType::Load:
|
c@97
|
184 response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest);
|
c@75
|
185 if (response.loadResponse.plugin != nullptr) {
|
c@75
|
186 mapper.addPlugin(response.loadResponse.plugin);
|
c@103
|
187 cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl;
|
c@75
|
188 response.success = true;
|
c@75
|
189 }
|
c@75
|
190 break;
|
c@75
|
191
|
c@75
|
192 case RRType::Configure:
|
c@75
|
193 {
|
c@75
|
194 auto &creq = request.configurationRequest;
|
c@75
|
195 auto h = mapper.pluginToHandle(creq.plugin);
|
c@75
|
196 if (mapper.isConfigured(h)) {
|
c@75
|
197 throw runtime_error("plugin has already been configured");
|
c@75
|
198 }
|
c@75
|
199
|
c@97
|
200 response.configurationResponse = LoaderRequests().configurePlugin(creq);
|
c@75
|
201
|
c@75
|
202 if (!response.configurationResponse.outputs.empty()) {
|
c@75
|
203 mapper.markConfigured
|
c@75
|
204 (h, creq.configuration.channelCount, creq.configuration.blockSize);
|
c@75
|
205 response.success = true;
|
c@75
|
206 }
|
c@75
|
207 break;
|
c@75
|
208 }
|
c@75
|
209
|
c@75
|
210 case RRType::Process:
|
c@75
|
211 {
|
c@75
|
212 auto &preq = request.processRequest;
|
c@75
|
213 auto h = mapper.pluginToHandle(preq.plugin);
|
c@75
|
214 if (!mapper.isConfigured(h)) {
|
c@75
|
215 throw runtime_error("plugin has not been configured");
|
c@75
|
216 }
|
c@75
|
217
|
c@75
|
218 int channels = int(preq.inputBuffers.size());
|
c@75
|
219 if (channels != mapper.getChannelCount(h)) {
|
c@75
|
220 throw runtime_error("wrong number of channels supplied to process");
|
c@75
|
221 }
|
c@75
|
222
|
c@75
|
223 const float **fbuffers = new const float *[channels];
|
c@75
|
224 for (int i = 0; i < channels; ++i) {
|
c@75
|
225 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
|
c@75
|
226 delete[] fbuffers;
|
c@75
|
227 throw runtime_error("wrong block size supplied to process");
|
c@75
|
228 }
|
c@75
|
229 fbuffers[i] = preq.inputBuffers[i].data();
|
c@75
|
230 }
|
c@75
|
231
|
c@75
|
232 response.processResponse.plugin = preq.plugin;
|
c@75
|
233 response.processResponse.features =
|
c@75
|
234 preq.plugin->process(fbuffers, preq.timestamp);
|
c@75
|
235 response.success = true;
|
c@75
|
236
|
c@75
|
237 delete[] fbuffers;
|
c@75
|
238 break;
|
c@75
|
239 }
|
c@75
|
240
|
c@75
|
241 case RRType::Finish:
|
c@75
|
242 {
|
c@77
|
243 auto &freq = request.finishRequest;
|
c@77
|
244 response.finishResponse.plugin = freq.plugin;
|
c@77
|
245
|
c@77
|
246 auto h = mapper.pluginToHandle(freq.plugin);
|
c@77
|
247 // Finish can be called (to unload the plugin) even if the
|
c@77
|
248 // plugin has never been configured or used. But we want to
|
c@77
|
249 // make sure we call getRemainingFeatures only if we have
|
c@77
|
250 // actually configured the plugin.
|
c@77
|
251 if (mapper.isConfigured(h)) {
|
c@77
|
252 response.finishResponse.features = freq.plugin->getRemainingFeatures();
|
c@77
|
253 }
|
c@75
|
254
|
c@75
|
255 // We do not delete the plugin here -- we need it in the
|
c@77
|
256 // mapper when converting the features. It gets deleted in the
|
c@77
|
257 // calling function.
|
c@75
|
258 response.success = true;
|
c@75
|
259 break;
|
c@75
|
260 }
|
c@75
|
261
|
c@75
|
262 case RRType::NotValid:
|
c@75
|
263 break;
|
c@75
|
264 }
|
c@75
|
265
|
c@75
|
266 return response;
|
c@75
|
267 }
|
c@75
|
268
|
c@103
|
269 int main(int argc, char **)
|
c@75
|
270 {
|
c@75
|
271 if (argc != 1) {
|
c@75
|
272 usage();
|
c@75
|
273 }
|
c@75
|
274
|
c@75
|
275 while (true) {
|
c@75
|
276
|
c@75
|
277 RequestOrResponse request;
|
c@75
|
278
|
c@75
|
279 try {
|
c@75
|
280
|
c@75
|
281 request = readRequestCapnp();
|
c@75
|
282
|
c@103
|
283 cerr << "piper-vamp-server " << pid << ": request received, of type "
|
c@75
|
284 << int(request.type)
|
c@75
|
285 << endl;
|
c@75
|
286
|
c@75
|
287 // NotValid without an exception indicates EOF:
|
c@75
|
288 if (request.type == RRType::NotValid) {
|
c@103
|
289 cerr << "piper-vamp-server " << pid << ": eof reached, exiting" << endl;
|
c@75
|
290 break;
|
c@75
|
291 }
|
c@75
|
292
|
c@75
|
293 RequestOrResponse response = handleRequest(request);
|
c@75
|
294 response.id = request.id;
|
c@75
|
295
|
c@103
|
296 cerr << "piper-vamp-server " << pid << ": request handled, writing response"
|
c@75
|
297 << endl;
|
c@75
|
298
|
c@75
|
299 writeResponseCapnp(response);
|
c@75
|
300
|
c@103
|
301 cerr << "piper-vamp-server " << pid << ": response written" << endl;
|
c@75
|
302
|
c@75
|
303 if (request.type == RRType::Finish) {
|
c@75
|
304 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
|
c@103
|
305 cerr << "piper-vamp-server " << pid << ": deleting the plugin with handle " << h << endl;
|
c@75
|
306 mapper.removePlugin(h);
|
c@75
|
307 delete request.finishRequest.plugin;
|
c@75
|
308 }
|
c@75
|
309
|
c@75
|
310 } catch (std::exception &e) {
|
c@75
|
311
|
c@103
|
312 cerr << "piper-vamp-server " << pid << ": error: " << e.what() << endl;
|
c@75
|
313
|
c@75
|
314 writeExceptionCapnp(e, request.type);
|
c@75
|
315
|
c@75
|
316 exit(1);
|
c@75
|
317 }
|
c@75
|
318 }
|
c@75
|
319
|
c@75
|
320 exit(0);
|
c@75
|
321 }
|