c@31
|
1
|
c@31
|
2 #include "VampnProto.h"
|
c@31
|
3
|
c@31
|
4 #include "bits/RequestOrResponse.h"
|
c@31
|
5
|
c@31
|
6 #include <iostream>
|
c@31
|
7 #include <sstream>
|
c@31
|
8 #include <stdexcept>
|
c@31
|
9
|
c@32
|
10 #include <map>
|
c@32
|
11 #include <set>
|
c@32
|
12
|
c@31
|
13 using namespace std;
|
c@31
|
14 using namespace vampipe;
|
c@32
|
15 using namespace Vamp;
|
c@32
|
16 using namespace Vamp::HostExt;
|
c@31
|
17
|
c@31
|
18 void usage()
|
c@31
|
19 {
|
c@31
|
20 string myname = "vampipe-server";
|
c@31
|
21 cerr << "\n" << myname <<
|
c@31
|
22 ": Load and run Vamp plugins in response to messages from stdin\n\n"
|
c@31
|
23 " Usage: " << myname << "\n\n"
|
c@31
|
24 "Expects Vamp request messages in Cap'n Proto packed format on stdin,\n"
|
c@31
|
25 "and writes Vamp response messages in the same format to stdout.\n\n";
|
c@31
|
26
|
c@31
|
27 exit(2);
|
c@31
|
28 }
|
c@31
|
29
|
c@32
|
30 class Mapper : public PluginHandleMapper
|
c@32
|
31 {
|
c@32
|
32 public:
|
c@32
|
33 Mapper() : m_nextHandle(1) { }
|
c@32
|
34
|
c@32
|
35 void addPlugin(Plugin *p) {
|
c@32
|
36 if (m_rplugins.find(p) == m_rplugins.end()) {
|
c@32
|
37 int32_t h = m_nextHandle++;
|
c@32
|
38 m_plugins[h] = p;
|
c@32
|
39 m_rplugins[p] = h;
|
c@32
|
40 }
|
c@32
|
41 }
|
c@33
|
42
|
c@33
|
43 void removePlugin(int32_t h) {
|
c@33
|
44 if (m_plugins.find(h) == m_plugins.end()) {
|
c@33
|
45 throw NotFound();
|
c@33
|
46 }
|
c@33
|
47 Plugin *p = m_plugins[h];
|
c@33
|
48 m_plugins.erase(h);
|
c@33
|
49 m_rplugins.erase(p);
|
c@33
|
50 }
|
c@32
|
51
|
c@32
|
52 int32_t pluginToHandle(Plugin *p) {
|
c@32
|
53 if (m_rplugins.find(p) == m_rplugins.end()) {
|
c@32
|
54 throw NotFound();
|
c@32
|
55 }
|
c@32
|
56 return m_rplugins[p];
|
c@32
|
57 }
|
c@32
|
58
|
c@32
|
59 Plugin *handleToPlugin(int32_t h) {
|
c@32
|
60 if (m_plugins.find(h) == m_plugins.end()) {
|
c@32
|
61 throw NotFound();
|
c@32
|
62 }
|
c@32
|
63 return m_plugins[h];
|
c@32
|
64 }
|
c@32
|
65
|
c@33
|
66 bool isConfigured(int32_t h) {
|
c@33
|
67 return m_configuredPlugins.find(h) != m_configuredPlugins.end();
|
c@32
|
68 }
|
c@32
|
69
|
c@33
|
70 void markConfigured(int32_t h) {
|
c@33
|
71 m_configuredPlugins.insert(h);
|
c@32
|
72 }
|
c@32
|
73
|
c@32
|
74 private:
|
c@32
|
75 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number
|
c@32
|
76 map<uint32_t, Plugin *> m_plugins;
|
c@32
|
77 map<Plugin *, uint32_t> m_rplugins;
|
c@33
|
78 set<uint32_t> m_configuredPlugins;
|
c@32
|
79 };
|
c@32
|
80
|
c@32
|
81 static Mapper mapper;
|
c@32
|
82
|
c@31
|
83 RequestOrResponse
|
c@31
|
84 readRequestCapnp()
|
c@31
|
85 {
|
c@31
|
86 RequestOrResponse rr;
|
c@31
|
87 rr.direction = RequestOrResponse::Request;
|
c@31
|
88
|
c@33
|
89 static kj::FdInputStream stream(0); // stdin
|
c@33
|
90 static kj::BufferedInputStreamWrapper buffered(stream);
|
c@33
|
91
|
c@33
|
92 if (buffered.tryGetReadBuffer() == nullptr) {
|
c@33
|
93 rr.type = RRType::NotValid;
|
c@33
|
94 return rr;
|
c@33
|
95 }
|
c@33
|
96
|
c@33
|
97 ::capnp::InputStreamMessageReader message(buffered);
|
c@31
|
98 VampRequest::Reader reader = message.getRoot<VampRequest>();
|
c@31
|
99
|
c@31
|
100 rr.type = VampnProto::getRequestResponseType(reader);
|
c@31
|
101
|
c@31
|
102 switch (rr.type) {
|
c@31
|
103
|
c@31
|
104 case RRType::List:
|
c@31
|
105 VampnProto::readVampRequest_List(reader); // type check only
|
c@31
|
106 break;
|
c@31
|
107 case RRType::Load:
|
c@31
|
108 VampnProto::readVampRequest_Load(rr.loadRequest, reader);
|
c@31
|
109 break;
|
c@31
|
110 case RRType::Configure:
|
c@32
|
111 VampnProto::readVampRequest_Configure(rr.configurationRequest,
|
c@32
|
112 reader, mapper);
|
c@31
|
113 break;
|
c@31
|
114 case RRType::Process:
|
c@32
|
115 VampnProto::readVampRequest_Process(rr.processRequest, reader, mapper);
|
c@31
|
116 break;
|
c@31
|
117 case RRType::Finish:
|
c@32
|
118 VampnProto::readVampRequest_Finish(rr.finishPlugin, reader, mapper);
|
c@31
|
119 break;
|
c@31
|
120 case RRType::NotValid:
|
c@31
|
121 break;
|
c@31
|
122 }
|
c@31
|
123
|
c@31
|
124 return rr;
|
c@31
|
125 }
|
c@31
|
126
|
c@31
|
127 void
|
c@31
|
128 writeResponseCapnp(RequestOrResponse &rr)
|
c@31
|
129 {
|
c@31
|
130 ::capnp::MallocMessageBuilder message;
|
c@31
|
131 VampResponse::Builder builder = message.initRoot<VampResponse>();
|
c@31
|
132
|
c@31
|
133 switch (rr.type) {
|
c@31
|
134
|
c@31
|
135 case RRType::List:
|
c@31
|
136 VampnProto::buildVampResponse_List(builder, "", rr.listResponse);
|
c@31
|
137 break;
|
c@31
|
138 case RRType::Load:
|
c@32
|
139 VampnProto::buildVampResponse_Load(builder, rr.loadResponse, mapper);
|
c@31
|
140 break;
|
c@31
|
141 case RRType::Configure:
|
c@31
|
142 VampnProto::buildVampResponse_Configure(builder, rr.configurationResponse);
|
c@31
|
143 break;
|
c@31
|
144 case RRType::Process:
|
c@31
|
145 VampnProto::buildVampResponse_Process(builder, rr.processResponse);
|
c@31
|
146 break;
|
c@31
|
147 case RRType::Finish:
|
c@31
|
148 VampnProto::buildVampResponse_Finish(builder, rr.finishResponse);
|
c@31
|
149 break;
|
c@31
|
150 case RRType::NotValid:
|
c@31
|
151 break;
|
c@31
|
152 }
|
c@31
|
153
|
c@33
|
154 writeMessageToFd(1, message);
|
c@31
|
155 }
|
c@31
|
156
|
c@31
|
157 RequestOrResponse
|
c@31
|
158 processRequest(const RequestOrResponse &request)
|
c@31
|
159 {
|
c@31
|
160 RequestOrResponse response;
|
c@31
|
161 response.direction = RequestOrResponse::Response;
|
c@32
|
162 response.type = request.type;
|
c@32
|
163
|
c@32
|
164 auto loader = PluginLoader::getInstance();
|
c@32
|
165
|
c@32
|
166 switch (request.type) {
|
c@32
|
167
|
c@32
|
168 case RRType::List:
|
c@32
|
169 response.listResponse = loader->listPluginData();
|
c@32
|
170 response.success = true;
|
c@32
|
171 break;
|
c@32
|
172
|
c@32
|
173 case RRType::Load:
|
c@32
|
174 response.loadResponse = loader->loadPlugin(request.loadRequest);
|
c@32
|
175 if (response.loadResponse.plugin != nullptr) {
|
c@32
|
176 mapper.addPlugin(response.loadResponse.plugin);
|
c@32
|
177 response.success = true;
|
c@32
|
178 }
|
c@32
|
179 break;
|
c@32
|
180
|
c@33
|
181 case RRType::Configure:
|
c@33
|
182 {
|
c@33
|
183 auto h = mapper.pluginToHandle(request.configurationRequest.plugin);
|
c@33
|
184 if (mapper.isConfigured(h)) {
|
c@33
|
185 throw runtime_error("plugin has already been configured");
|
c@33
|
186 }
|
c@33
|
187
|
c@33
|
188 response.configurationResponse =
|
c@33
|
189 loader->configurePlugin(request.configurationRequest);
|
c@33
|
190
|
c@33
|
191 if (!response.configurationResponse.outputs.empty()) {
|
c@33
|
192 mapper.markConfigured(h);
|
c@33
|
193 response.success = true;
|
c@33
|
194 }
|
c@33
|
195 break;
|
c@33
|
196 }
|
c@33
|
197
|
c@33
|
198 case RRType::Process:
|
c@33
|
199 {
|
c@33
|
200 auto &preq = request.processRequest;
|
c@33
|
201 int channels = int(preq.inputBuffers.size());
|
c@33
|
202 const float **fbuffers = new const float *[channels];
|
c@33
|
203 for (int i = 0; i < channels; ++i) {
|
c@33
|
204 fbuffers[i] = preq.inputBuffers[i].data();
|
c@33
|
205 }
|
c@33
|
206
|
c@33
|
207 response.processResponse.features =
|
c@33
|
208 preq.plugin->process(fbuffers, preq.timestamp);
|
c@33
|
209 response.success = true;
|
c@33
|
210
|
c@33
|
211 delete[] fbuffers;
|
c@33
|
212 break;
|
c@33
|
213 }
|
c@33
|
214
|
c@33
|
215 case RRType::Finish:
|
c@33
|
216 {
|
c@33
|
217 auto h = mapper.pluginToHandle(request.finishPlugin);
|
c@33
|
218
|
c@33
|
219 response.finishResponse.features =
|
c@33
|
220 request.finishPlugin->getRemainingFeatures();
|
c@33
|
221
|
c@33
|
222 mapper.removePlugin(h);
|
c@33
|
223 delete request.finishPlugin;
|
c@33
|
224 response.success = true;
|
c@33
|
225 break;
|
c@33
|
226 }
|
c@33
|
227
|
c@33
|
228 case RRType::NotValid:
|
c@33
|
229 break;
|
c@32
|
230 }
|
c@32
|
231
|
c@31
|
232 return response;
|
c@31
|
233 }
|
c@31
|
234
|
c@31
|
235 int main(int argc, char **argv)
|
c@31
|
236 {
|
c@31
|
237 if (argc != 1) {
|
c@31
|
238 usage();
|
c@31
|
239 }
|
c@31
|
240
|
c@31
|
241 while (true) {
|
c@31
|
242
|
c@31
|
243 try {
|
c@31
|
244
|
c@31
|
245 RequestOrResponse request = readRequestCapnp();
|
c@31
|
246
|
c@33
|
247 cerr << "vampipe-server: request received, of type "
|
c@33
|
248 << int(request.type)
|
c@33
|
249 << endl;
|
c@33
|
250
|
c@31
|
251 // NotValid without an exception indicates EOF:
|
c@33
|
252 if (request.type == RRType::NotValid) {
|
c@33
|
253 cerr << "vampipe-server: eof reached" << endl;
|
c@33
|
254 break;
|
c@33
|
255 }
|
c@31
|
256
|
c@31
|
257 RequestOrResponse response = processRequest(request);
|
c@33
|
258
|
c@33
|
259 cerr << "vampipe-server: request processed, writing response"
|
c@33
|
260 << endl;
|
c@31
|
261
|
c@31
|
262 writeResponseCapnp(response);
|
c@33
|
263
|
c@33
|
264 cerr << "vampipe-server: response written" << endl;
|
c@31
|
265
|
c@31
|
266 } catch (std::exception &e) {
|
c@33
|
267 cerr << "vampipe-server: error: " << e.what() << endl;
|
c@31
|
268 exit(1);
|
c@31
|
269 }
|
c@31
|
270 }
|
c@31
|
271
|
c@31
|
272 exit(0);
|
c@31
|
273 }
|