c@31
|
1
|
c@31
|
2 #include "VampnProto.h"
|
c@31
|
3
|
c@31
|
4 #include "bits/RequestOrResponse.h"
|
c@31
|
5
|
c@31
|
6 #include <iostream>
|
c@31
|
7 #include <sstream>
|
c@31
|
8 #include <stdexcept>
|
c@31
|
9
|
c@32
|
10 #include <map>
|
c@32
|
11 #include <set>
|
c@32
|
12
|
c@31
|
13 using namespace std;
|
c@31
|
14 using namespace vampipe;
|
c@32
|
15 using namespace Vamp;
|
c@32
|
16 using namespace Vamp::HostExt;
|
c@31
|
17
|
c@31
|
18 void usage()
|
c@31
|
19 {
|
c@31
|
20 string myname = "vampipe-server";
|
c@31
|
21 cerr << "\n" << myname <<
|
c@31
|
22 ": Load and run Vamp plugins in response to messages from stdin\n\n"
|
c@31
|
23 " Usage: " << myname << "\n\n"
|
c@31
|
24 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
|
c@31
|
25 "and writes Vamp response messages in the same format to stdout.\n\n";
|
c@31
|
26
|
c@31
|
27 exit(2);
|
c@31
|
28 }
|
c@31
|
29
|
c@32
|
30 class Mapper : public PluginHandleMapper
|
c@32
|
31 {
|
c@32
|
32 public:
|
c@32
|
33 Mapper() : m_nextHandle(1) { }
|
c@32
|
34
|
c@32
|
35 void addPlugin(Plugin *p) {
|
c@32
|
36 if (m_rplugins.find(p) == m_rplugins.end()) {
|
c@32
|
37 int32_t h = m_nextHandle++;
|
c@32
|
38 m_plugins[h] = p;
|
c@32
|
39 m_rplugins[p] = h;
|
c@32
|
40 }
|
c@32
|
41 }
|
c@33
|
42
|
c@33
|
43 void removePlugin(int32_t h) {
|
c@33
|
44 if (m_plugins.find(h) == m_plugins.end()) {
|
c@33
|
45 throw NotFound();
|
c@33
|
46 }
|
c@33
|
47 Plugin *p = m_plugins[h];
|
c@33
|
48 m_plugins.erase(h);
|
c@34
|
49 if (isConfigured(h)) {
|
c@34
|
50 m_configuredPlugins.erase(h);
|
c@34
|
51 m_channelCounts.erase(h);
|
c@34
|
52 }
|
c@33
|
53 m_rplugins.erase(p);
|
c@33
|
54 }
|
c@32
|
55
|
c@32
|
56 int32_t pluginToHandle(Plugin *p) {
|
c@32
|
57 if (m_rplugins.find(p) == m_rplugins.end()) {
|
c@32
|
58 throw NotFound();
|
c@32
|
59 }
|
c@32
|
60 return m_rplugins[p];
|
c@32
|
61 }
|
c@32
|
62
|
c@32
|
63 Plugin *handleToPlugin(int32_t h) {
|
c@32
|
64 if (m_plugins.find(h) == m_plugins.end()) {
|
c@32
|
65 throw NotFound();
|
c@32
|
66 }
|
c@32
|
67 return m_plugins[h];
|
c@32
|
68 }
|
c@32
|
69
|
c@33
|
70 bool isConfigured(int32_t h) {
|
c@33
|
71 return m_configuredPlugins.find(h) != m_configuredPlugins.end();
|
c@32
|
72 }
|
c@32
|
73
|
c@34
|
74 void markConfigured(int32_t h, int channelCount, int blockSize) {
|
c@33
|
75 m_configuredPlugins.insert(h);
|
c@34
|
76 m_channelCounts[h] = channelCount;
|
c@34
|
77 m_blockSizes[h] = blockSize;
|
c@34
|
78 }
|
c@34
|
79
|
c@34
|
80 int getChannelCount(int32_t h) {
|
c@34
|
81 if (m_channelCounts.find(h) == m_channelCounts.end()) {
|
c@34
|
82 throw NotFound();
|
c@34
|
83 }
|
c@34
|
84 return m_channelCounts[h];
|
c@34
|
85 }
|
c@34
|
86
|
c@34
|
87 int getBlockSize(int32_t h) {
|
c@34
|
88 if (m_blockSizes.find(h) == m_blockSizes.end()) {
|
c@34
|
89 throw NotFound();
|
c@34
|
90 }
|
c@34
|
91 return m_blockSizes[h];
|
c@32
|
92 }
|
c@32
|
93
|
c@32
|
94 private:
|
c@32
|
95 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number
|
c@32
|
96 map<uint32_t, Plugin *> m_plugins;
|
c@32
|
97 map<Plugin *, uint32_t> m_rplugins;
|
c@33
|
98 set<uint32_t> m_configuredPlugins;
|
c@34
|
99 map<uint32_t, int> m_channelCounts;
|
c@34
|
100 map<uint32_t, int> m_blockSizes;
|
c@32
|
101 };
|
c@32
|
102
|
c@32
|
103 static Mapper mapper;
|
c@32
|
104
|
c@31
|
105 RequestOrResponse
|
c@31
|
106 readRequestCapnp()
|
c@31
|
107 {
|
c@31
|
108 RequestOrResponse rr;
|
c@31
|
109 rr.direction = RequestOrResponse::Request;
|
c@31
|
110
|
c@33
|
111 static kj::FdInputStream stream(0); // stdin
|
c@33
|
112 static kj::BufferedInputStreamWrapper buffered(stream);
|
c@33
|
113
|
c@33
|
114 if (buffered.tryGetReadBuffer() == nullptr) {
|
c@33
|
115 rr.type = RRType::NotValid;
|
c@33
|
116 return rr;
|
c@33
|
117 }
|
c@33
|
118
|
c@33
|
119 ::capnp::InputStreamMessageReader message(buffered);
|
c@31
|
120 VampRequest::Reader reader = message.getRoot<VampRequest>();
|
c@31
|
121
|
c@31
|
122 rr.type = VampnProto::getRequestResponseType(reader);
|
c@31
|
123
|
c@31
|
124 switch (rr.type) {
|
c@31
|
125
|
c@31
|
126 case RRType::List:
|
c@31
|
127 VampnProto::readVampRequest_List(reader); // type check only
|
c@31
|
128 break;
|
c@31
|
129 case RRType::Load:
|
c@31
|
130 VampnProto::readVampRequest_Load(rr.loadRequest, reader);
|
c@31
|
131 break;
|
c@31
|
132 case RRType::Configure:
|
c@32
|
133 VampnProto::readVampRequest_Configure(rr.configurationRequest,
|
c@32
|
134 reader, mapper);
|
c@31
|
135 break;
|
c@31
|
136 case RRType::Process:
|
c@32
|
137 VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper);
|
c@31
|
138 break;
|
c@31
|
139 case RRType::Finish:
|
c@32
|
140 VampnProto::readVampRequest_Finish(rr.finishPlugin, reader, mapper);
|
c@31
|
141 break;
|
c@31
|
142 case RRType::NotValid:
|
c@31
|
143 break;
|
c@31
|
144 }
|
c@31
|
145
|
c@31
|
146 return rr;
|
c@31
|
147 }
|
c@31
|
148
|
c@31
|
149 void
|
c@31
|
150 writeResponseCapnp(RequestOrResponse &rr)
|
c@31
|
151 {
|
c@31
|
152 ::capnp::MallocMessageBuilder message;
|
c@31
|
153 VampResponse::Builder builder = message.initRoot<VampResponse>();
|
c@31
|
154
|
c@31
|
155 switch (rr.type) {
|
c@31
|
156
|
c@31
|
157 case RRType::List:
|
c@31
|
158 VampnProto::buildVampResponse_List(builder, "", rr.listResponse);
|
c@31
|
159 break;
|
c@31
|
160 case RRType::Load:
|
c@32
|
161 VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper);
|
c@31
|
162 break;
|
c@31
|
163 case RRType::Configure:
|
c@31
|
164 VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse);
|
c@31
|
165 break;
|
c@31
|
166 case RRType::Process:
|
c@31
|
167 VampnProto::buildVampResponse_Process(builder, rr.processResponse);
|
c@31
|
168 break;
|
c@31
|
169 case RRType::Finish:
|
c@31
|
170 VampnProto::buildVampResponse_Finish(builder, rr.finishResponse);
|
c@31
|
171 break;
|
c@31
|
172 case RRType::NotValid:
|
c@31
|
173 break;
|
c@31
|
174 }
|
c@31
|
175
|
c@33
|
176 writeMessageToFd(1, message);
|
c@31
|
177 }
|
c@31
|
178
|
c@31
|
179 RequestOrResponse
|
c@34
|
180 handleRequest(const RequestOrResponse &request)
|
c@31
|
181 {
|
c@31
|
182 RequestOrResponse response;
|
c@31
|
183 response.direction = RequestOrResponse::Response;
|
c@32
|
184 response.type = request.type;
|
c@32
|
185
|
c@32
|
186 auto loader = PluginLoader::getInstance();
|
c@32
|
187
|
c@32
|
188 switch (request.type) {
|
c@32
|
189
|
c@32
|
190 case RRType::List:
|
c@32
|
191 response.listResponse = loader->listPluginData();
|
c@32
|
192 response.success = true;
|
c@32
|
193 break;
|
c@32
|
194
|
c@32
|
195 case RRType::Load:
|
c@32
|
196 response.loadResponse = loader->loadPlugin(request.loadRequest);
|
c@32
|
197 if (response.loadResponse.plugin != nullptr) {
|
c@32
|
198 mapper.addPlugin(response.loadResponse.plugin);
|
c@32
|
199 response.success = true;
|
c@32
|
200 }
|
c@32
|
201 break;
|
c@32
|
202
|
c@33
|
203 case RRType::Configure:
|
c@33
|
204 {
|
c@34
|
205 auto &creq = request.configurationRequest;
|
c@34
|
206 auto h = mapper.pluginToHandle(creq.plugin);
|
c@33
|
207 if (mapper.isConfigured(h)) {
|
c@33
|
208 throw runtime_error("plugin has already been configured");
|
c@33
|
209 }
|
c@33
|
210
|
c@34
|
211 response.configurationResponse = loader->configurePlugin(creq);
|
c@33
|
212
|
c@33
|
213 if (!response.configurationResponse.outputs.empty()) {
|
c@34
|
214 mapper.markConfigured
|
c@34
|
215 (h, creq.configuration.channelCount, creq.configuration.blockSize);
|
c@33
|
216 response.success = true;
|
c@33
|
217 }
|
c@33
|
218 break;
|
c@33
|
219 }
|
c@33
|
220
|
c@33
|
221 case RRType::Process:
|
c@33
|
222 {
|
c@33
|
223 auto &preq = request.processRequest;
|
c@34
|
224 auto h = mapper.pluginToHandle(preq.plugin);
|
c@34
|
225 if (!mapper.isConfigured(h)) {
|
c@34
|
226 throw runtime_error("plugin has not been configured");
|
c@34
|
227 }
|
c@34
|
228
|
c@33
|
229 int channels = int(preq.inputBuffers.size());
|
c@34
|
230 if (channels != mapper.getChannelCount(h)) {
|
c@34
|
231 throw runtime_error("wrong number of channels supplied to process");
|
c@34
|
232 }
|
c@34
|
233
|
c@33
|
234 const float **fbuffers = new const float *[channels];
|
c@33
|
235 for (int i = 0; i < channels; ++i) {
|
c@34
|
236 if (int(preq.inputBuffers[i].size()) != mapper.getBlockSize(h)) {
|
c@34
|
237 delete[] fbuffers;
|
c@34
|
238 throw runtime_error("wrong block size supplied to process");
|
c@34
|
239 }
|
c@33
|
240 fbuffers[i] = preq.inputBuffers[i].data();
|
c@33
|
241 }
|
c@33
|
242
|
c@33
|
243 response.processResponse.features =
|
c@33
|
244 preq.plugin->process(fbuffers, preq.timestamp);
|
c@33
|
245 response.success = true;
|
c@33
|
246
|
c@33
|
247 delete[] fbuffers;
|
c@33
|
248 break;
|
c@33
|
249 }
|
c@33
|
250
|
c@33
|
251 case RRType::Finish:
|
c@33
|
252 {
|
c@33
|
253 auto h = mapper.pluginToHandle(request.finishPlugin);
|
c@33
|
254
|
c@33
|
255 response.finishResponse.features =
|
c@33
|
256 request.finishPlugin->getRemainingFeatures();
|
c@33
|
257
|
c@33
|
258 mapper.removePlugin(h);
|
c@33
|
259 delete request.finishPlugin;
|
c@33
|
260 response.success = true;
|
c@33
|
261 break;
|
c@33
|
262 }
|
c@33
|
263
|
c@33
|
264 case RRType::NotValid:
|
c@33
|
265 break;
|
c@32
|
266 }
|
c@32
|
267
|
c@31
|
268 return response;
|
c@31
|
269 }
|
c@31
|
270
|
c@31
|
271 int main(int argc, char **argv)
|
c@31
|
272 {
|
c@31
|
273 if (argc != 1) {
|
c@31
|
274 usage();
|
c@31
|
275 }
|
c@31
|
276
|
c@31
|
277 while (true) {
|
c@31
|
278
|
c@31
|
279 try {
|
c@31
|
280
|
c@31
|
281 RequestOrResponse request = readRequestCapnp();
|
c@31
|
282
|
c@33
|
283 cerr << "vampipe-server: request received, of type "
|
c@33
|
284 << int(request.type)
|
c@33
|
285 << endl;
|
c@33
|
286
|
c@31
|
287 // NotValid without an exception indicates EOF:
|
c@33
|
288 if (request.type == RRType::NotValid) {
|
c@33
|
289 cerr << "vampipe-server: eof reached" << endl;
|
c@33
|
290 break;
|
c@33
|
291 }
|
c@31
|
292
|
c@34
|
293 RequestOrResponse response = handleRequest(request);
|
c@33
|
294
|
c@34
|
295 cerr << "vampipe-server: request handled, writing response"
|
c@33
|
296 << endl;
|
c@31
|
297
|
c@31
|
298 writeResponseCapnp(response);
|
c@33
|
299
|
c@33
|
300 cerr << "vampipe-server: response written" << endl;
|
c@31
|
301
|
c@31
|
302 } catch (std::exception &e) {
|
c@33
|
303 cerr << "vampipe-server: error: " << e.what() << endl;
|
c@31
|
304 exit(1);
|
c@31
|
305 }
|
c@31
|
306 }
|
c@31
|
307
|
c@31
|
308 exit(0);
|
c@31
|
309 }
|