c@75
|
1
|
c@75
|
2 #include "vamp-capnp/VampnProto.h"
|
c@75
|
3 #include "vamp-support/RequestOrResponse.h"
|
c@75
|
4 #include "vamp-support/CountingPluginHandleMapper.h"
|
c@97
|
5 #include "vamp-support/LoaderRequests.h"
|
c@75
|
6
|
c@75
|
7 #include <iostream>
|
c@75
|
8 #include <sstream>
|
c@75
|
9 #include <stdexcept>
|
c@75
|
10
|
c@91
|
11 #include <capnp/serialize.h>
|
c@91
|
12
|
c@75
|
13 #include <map>
|
c@75
|
14 #include <set>
|
c@75
|
15
|
c@109
|
16 // pid for logging
|
c@109
|
17 #ifdef _WIN32
|
c@109
|
18 #include <process.h>
|
c@109
|
19 static int pid = _getpid();
|
c@109
|
20 #else
|
c@109
|
21 #include <unistd.h>
|
c@109
|
22 static int pid = getpid();
|
c@109
|
23 #endif
|
c@103
|
24
|
c@75
|
25 using namespace std;
|
c@97
|
26 using namespace piper_vamp;
|
c@75
|
27 using namespace Vamp;
|
c@75
|
28
|
c@102
|
29 //!!! This could be faster and lighter:
|
c@102
|
30 // - Use Capnp structures directly rather than converting to vamp-support ones
|
c@102
|
31 // - Use Vamp C API (vamp.h) directly rather than converting to C++
|
c@102
|
32 //!!! Doing the above for process() and finish() alone would be a good start
|
c@102
|
33
|
c@75
|
34 void usage()
|
c@75
|
35 {
|
c@75
|
36 string myname = "piper-vamp-server";
|
c@75
|
37 cerr << "\n" << myname <<
|
c@75
|
38 ": Load and run Vamp plugins in response to messages from stdin\n\n"
|
c@75
|
39 " Usage: " << myname << "\n\n"
|
c@75
|
40 "Expects Piper request messages in Cap'n Proto packed format on stdin,\n"
|
c@75
|
41 "and writes Piper response messages in the same format to stdout.\n\n";
|
c@75
|
42
|
c@75
|
43 exit(2);
|
c@75
|
44 }
|
c@75
|
45
|
c@75
|
46 static CountingPluginHandleMapper mapper;
|
c@75
|
47
|
c@97
|
48 static RequestOrResponse::RpcId readId(const piper::RpcRequest::Reader &r)
|
c@75
|
49 {
|
c@75
|
50 int number;
|
c@75
|
51 string tag;
|
c@75
|
52 switch (r.getId().which()) {
|
c@97
|
53 case piper::RpcRequest::Id::Which::NUMBER:
|
c@75
|
54 number = r.getId().getNumber();
|
c@75
|
55 return { RequestOrResponse::RpcId::Number, number, "" };
|
c@97
|
56 case piper::RpcRequest::Id::Which::TAG:
|
c@75
|
57 tag = r.getId().getTag();
|
c@75
|
58 return { RequestOrResponse::RpcId::Tag, 0, tag };
|
c@97
|
59 case piper::RpcRequest::Id::Which::NONE:
|
c@75
|
60 return { RequestOrResponse::RpcId::Absent, 0, "" };
|
c@75
|
61 }
|
c@75
|
62 return {};
|
c@75
|
63 }
|
c@75
|
64
|
c@97
|
65 static void buildId(piper::RpcResponse::Builder &b, const RequestOrResponse::RpcId &id)
|
c@75
|
66 {
|
c@75
|
67 switch (id.type) {
|
c@75
|
68 case RequestOrResponse::RpcId::Number:
|
c@75
|
69 b.getId().setNumber(id.number);
|
c@75
|
70 break;
|
c@75
|
71 case RequestOrResponse::RpcId::Tag:
|
c@75
|
72 b.getId().setTag(id.tag);
|
c@75
|
73 break;
|
c@75
|
74 case RequestOrResponse::RpcId::Absent:
|
c@75
|
75 b.getId().setNone();
|
c@75
|
76 break;
|
c@75
|
77 }
|
c@75
|
78 }
|
c@75
|
79
|
c@75
|
80 RequestOrResponse
|
c@75
|
81 readRequestCapnp()
|
c@75
|
82 {
|
c@75
|
83 RequestOrResponse rr;
|
c@75
|
84 rr.direction = RequestOrResponse::Request;
|
c@75
|
85
|
c@75
|
86 static kj::FdInputStream stream(0); // stdin
|
c@75
|
87 static kj::BufferedInputStreamWrapper buffered(stream);
|
c@75
|
88
|
c@75
|
89 if (buffered.tryGetReadBuffer() == nullptr) {
|
c@75
|
90 rr.type = RRType::NotValid;
|
c@75
|
91 return rr;
|
c@75
|
92 }
|
c@75
|
93
|
c@97
|
94 capnp::InputStreamMessageReader message(buffered);
|
c@97
|
95 piper::RpcRequest::Reader reader = message.getRoot<piper::RpcRequest>();
|
c@75
|
96
|
c@75
|
97 rr.type = VampnProto::getRequestResponseType(reader);
|
c@75
|
98 rr.id = readId(reader);
|
c@75
|
99
|
c@75
|
100 switch (rr.type) {
|
c@75
|
101
|
c@75
|
102 case RRType::List:
|
c@75
|
103 VampnProto::readRpcRequest_List(reader); // type check only
|
c@75
|
104 break;
|
c@75
|
105 case RRType::Load:
|
c@75
|
106 VampnProto::readRpcRequest_Load(rr.loadRequest, reader);
|
c@75
|
107 break;
|
c@75
|
108 case RRType::Configure:
|
c@75
|
109 VampnProto::readRpcRequest_Configure(rr.configurationRequest,
|
c@75
|
110 reader, mapper);
|
c@75
|
111 break;
|
c@75
|
112 case RRType::Process:
|
c@75
|
113 VampnProto::readRpcRequest_Process(rr.processRequest, reader, mapper);
|
c@75
|
114 break;
|
c@75
|
115 case RRType::Finish:
|
c@75
|
116 VampnProto::readRpcRequest_Finish(rr.finishRequest, reader, mapper);
|
c@75
|
117 break;
|
c@75
|
118 case RRType::NotValid:
|
c@75
|
119 break;
|
c@75
|
120 }
|
c@75
|
121
|
c@75
|
122 return rr;
|
c@75
|
123 }
|
c@75
|
124
|
c@75
|
125 void
|
c@75
|
126 writeResponseCapnp(RequestOrResponse &rr)
|
c@75
|
127 {
|
c@97
|
128 capnp::MallocMessageBuilder message;
|
c@97
|
129 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
|
c@75
|
130
|
c@75
|
131 buildId(builder, rr.id);
|
c@75
|
132
|
c@75
|
133 if (!rr.success) {
|
c@75
|
134
|
c@75
|
135 VampnProto::buildRpcResponse_Error(builder, rr.errorText, rr.type);
|
c@75
|
136
|
c@75
|
137 } else {
|
c@75
|
138
|
c@75
|
139 switch (rr.type) {
|
c@75
|
140
|
c@75
|
141 case RRType::List:
|
c@75
|
142 VampnProto::buildRpcResponse_List(builder, rr.listResponse);
|
c@75
|
143 break;
|
c@75
|
144 case RRType::Load:
|
c@75
|
145 VampnProto::buildRpcResponse_Load(builder, rr.loadResponse, mapper);
|
c@75
|
146 break;
|
c@75
|
147 case RRType::Configure:
|
c@75
|
148 VampnProto::buildRpcResponse_Configure(builder, rr.configurationResponse, mapper);
|
c@75
|
149 break;
|
c@75
|
150 case RRType::Process:
|
c@75
|
151 VampnProto::buildRpcResponse_Process(builder, rr.processResponse, mapper);
|
c@75
|
152 break;
|
c@75
|
153 case RRType::Finish:
|
c@75
|
154 VampnProto::buildRpcResponse_Finish(builder, rr.finishResponse, mapper);
|
c@75
|
155 break;
|
c@75
|
156 case RRType::NotValid:
|
c@75
|
157 break;
|
c@75
|
158 }
|
c@75
|
159 }
|
c@75
|
160
|
c@75
|
161 writeMessageToFd(1, message);
|
c@75
|
162 }
|
c@75
|
163
|
c@75
|
164 void
|
c@75
|
165 writeExceptionCapnp(const std::exception &e, RRType type)
|
c@75
|
166 {
|
c@97
|
167 capnp::MallocMessageBuilder message;
|
c@97
|
168 piper::RpcResponse::Builder builder = message.initRoot<piper::RpcResponse>();
|
c@75
|
169 VampnProto::buildRpcResponse_Exception(builder, e, type);
|
c@75
|
170
|
c@75
|
171 writeMessageToFd(1, message);
|
c@75
|
172 }
|
c@75
|
173
|
c@75
|
174 RequestOrResponse
|
c@75
|
175 handleRequest(const RequestOrResponse &request)
|
c@75
|
176 {
|
c@75
|
177 RequestOrResponse response;
|
c@75
|
178 response.direction = RequestOrResponse::Response;
|
c@75
|
179 response.type = request.type;
|
c@75
|
180
|
c@75
|
181 switch (request.type) {
|
c@75
|
182
|
c@75
|
183 case RRType::List:
|
c@97
|
184 response.listResponse = LoaderRequests().listPluginData();
|
c@75
|
185 response.success = true;
|
c@75
|
186 break;
|
c@75
|
187
|
c@75
|
188 case RRType::Load:
|
c@97
|
189 response.loadResponse = LoaderRequests().loadPlugin(request.loadRequest);
|
c@75
|
190 if (response.loadResponse.plugin != nullptr) {
|
c@75
|
191 mapper.addPlugin(response.loadResponse.plugin);
|
c@103
|
192 cerr << "piper-vamp-server " << pid << ": loaded plugin, handle = " << mapper.pluginToHandle(response.loadResponse.plugin) << endl;
|
c@75
|
193 response.success = true;
|
c@75
|
194 }
|
c@75
|
195 break;
|
c@75
|
196
|
c@75
|
197 case RRType::Configure:
|
c@75
|
198 {
|
c@75
|
199 auto &creq = request.configurationRequest;
|
c@75
|
200 auto h = mapper.pluginToHandle(creq.plugin);
|
c@75
|
201 if (mapper.isConfigured(h)) {
|
c@75
|
202 throw runtime_error("plugin has already been configured");
|
c@75
|
203 }
|
c@75
|
204
|
c@97
|
205 response.configurationResponse = LoaderRequests().configurePlugin(creq);
|
c@75
|
206
|
c@75
|
207 if (!response.configurationResponse.outputs.empty()) {
|
c@75
|
208 mapper.markConfigured
|
c@75
|
209 (h, creq.configuration.channelCount, creq.configuration.blockSize);
|
c@75
|
210 response.success = true;
|
c@75
|
211 }
|
c@75
|
212 break;
|
c@75
|
213 }
|
c@75
|
214
|
c@75
|
215 case RRType::Process:
|
c@75
|
216 {
|
c@75
|
217 auto &preq = request.processRequest;
|
c@75
|
218 auto h = mapper.pluginToHandle(preq.plugin);
|
c@75
|
219 if (!mapper.isConfigured(h)) {
|
c@75
|
220 throw runtime_error("plugin has not been configured");
|
c@75
|
221 }
|
c@75
|
222
|
c@75
|
223 int channels = int(preq.inputBuffers.size());
|
c@75
|
224 if (channels != mapper.getChannelCount(h)) {
|
c@75
|
225 throw runtime_error("wrong number of channels supplied to process");
|
c@75
|
226 }
|
c@75
|
227
|
c@75
|
228 const float **fbuffers = new const float *[channels];
|
c@75
|
229 for (int i = 0; i < channels; ++i) {
|
c@75
|
230 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
|
c@75
|
231 delete[] fbuffers;
|
c@75
|
232 throw runtime_error("wrong block size supplied to process");
|
c@75
|
233 }
|
c@75
|
234 fbuffers[i] = preq.inputBuffers[i].data();
|
c@75
|
235 }
|
c@75
|
236
|
c@75
|
237 response.processResponse.plugin = preq.plugin;
|
c@75
|
238 response.processResponse.features =
|
c@75
|
239 preq.plugin->process(fbuffers, preq.timestamp);
|
c@75
|
240 response.success = true;
|
c@75
|
241
|
c@75
|
242 delete[] fbuffers;
|
c@75
|
243 break;
|
c@75
|
244 }
|
c@75
|
245
|
c@75
|
246 case RRType::Finish:
|
c@75
|
247 {
|
c@77
|
248 auto &freq = request.finishRequest;
|
c@77
|
249 response.finishResponse.plugin = freq.plugin;
|
c@77
|
250
|
c@77
|
251 auto h = mapper.pluginToHandle(freq.plugin);
|
c@77
|
252 // Finish can be called (to unload the plugin) even if the
|
c@77
|
253 // plugin has never been configured or used. But we want to
|
c@77
|
254 // make sure we call getRemainingFeatures only if we have
|
c@77
|
255 // actually configured the plugin.
|
c@77
|
256 if (mapper.isConfigured(h)) {
|
c@77
|
257 response.finishResponse.features = freq.plugin->getRemainingFeatures();
|
c@77
|
258 }
|
c@75
|
259
|
c@75
|
260 // We do not delete the plugin here -- we need it in the
|
c@77
|
261 // mapper when converting the features. It gets deleted in the
|
c@77
|
262 // calling function.
|
c@75
|
263 response.success = true;
|
c@75
|
264 break;
|
c@75
|
265 }
|
c@75
|
266
|
c@75
|
267 case RRType::NotValid:
|
c@75
|
268 break;
|
c@75
|
269 }
|
c@75
|
270
|
c@75
|
271 return response;
|
c@75
|
272 }
|
c@75
|
273
|
c@103
|
274 int main(int argc, char **)
|
c@75
|
275 {
|
c@75
|
276 if (argc != 1) {
|
c@75
|
277 usage();
|
c@75
|
278 }
|
c@75
|
279
|
c@75
|
280 while (true) {
|
c@75
|
281
|
c@75
|
282 RequestOrResponse request;
|
c@75
|
283
|
c@75
|
284 try {
|
c@75
|
285
|
c@75
|
286 request = readRequestCapnp();
|
c@75
|
287
|
c@103
|
288 cerr << "piper-vamp-server " << pid << ": request received, of type "
|
c@75
|
289 << int(request.type)
|
c@75
|
290 << endl;
|
c@75
|
291
|
c@75
|
292 // NotValid without an exception indicates EOF:
|
c@75
|
293 if (request.type == RRType::NotValid) {
|
c@103
|
294 cerr << "piper-vamp-server " << pid << ": eof reached, exiting" << endl;
|
c@75
|
295 break;
|
c@75
|
296 }
|
c@75
|
297
|
c@75
|
298 RequestOrResponse response = handleRequest(request);
|
c@75
|
299 response.id = request.id;
|
c@75
|
300
|
c@103
|
301 cerr << "piper-vamp-server " << pid << ": request handled, writing response"
|
c@75
|
302 << endl;
|
c@75
|
303
|
c@75
|
304 writeResponseCapnp(response);
|
c@75
|
305
|
c@103
|
306 cerr << "piper-vamp-server " << pid << ": response written" << endl;
|
c@75
|
307
|
c@75
|
308 if (request.type == RRType::Finish) {
|
c@75
|
309 auto h = mapper.pluginToHandle(request.finishRequest.plugin);
|
c@103
|
310 cerr << "piper-vamp-server " << pid << ": deleting the plugin with handle " << h << endl;
|
c@75
|
311 mapper.removePlugin(h);
|
c@75
|
312 delete request.finishRequest.plugin;
|
c@75
|
313 }
|
c@75
|
314
|
c@75
|
315 } catch (std::exception &e) {
|
c@75
|
316
|
c@103
|
317 cerr << "piper-vamp-server " << pid << ": error: " << e.what() << endl;
|
c@75
|
318
|
c@75
|
319 writeExceptionCapnp(e, request.type);
|
c@75
|
320
|
c@75
|
321 exit(1);
|
c@75
|
322 }
|
c@75
|
323 }
|
c@75
|
324
|
c@75
|
325 exit(0);
|
c@75
|
326 }
|