comparison utilities/vampipe-server.cpp @ 33:0b48b10140bb

Switch to non-packed protocol and handle multiple messages and EOF properly; fill in remaining server actions
author Chris Cannam <c.cannam@qmul.ac.uk>
date Wed, 25 May 2016 10:43:07 +0100
parents 2d97883d20df
children ba58fe5ee2dd
comparison
equal deleted inserted replaced
32:2d97883d20df 33:0b48b10140bb
37 int32_t h = m_nextHandle++; 37 int32_t h = m_nextHandle++;
38 m_plugins[h] = p; 38 m_plugins[h] = p;
39 m_rplugins[p] = h; 39 m_rplugins[p] = h;
40 } 40 }
41 } 41 }
42
43 void removePlugin(int32_t h) {
44 if (m_plugins.find(h) == m_plugins.end()) {
45 throw NotFound();
46 }
47 Plugin *p = m_plugins[h];
48 m_plugins.erase(h);
49 m_rplugins.erase(p);
50 }
42 51
43 int32_t pluginToHandle(Plugin *p) { 52 int32_t pluginToHandle(Plugin *p) {
44 if (m_rplugins.find(p) == m_rplugins.end()) { 53 if (m_rplugins.find(p) == m_rplugins.end()) {
45 throw NotFound(); 54 throw NotFound();
46 } 55 }
52 throw NotFound(); 61 throw NotFound();
53 } 62 }
54 return m_plugins[h]; 63 return m_plugins[h];
55 } 64 }
56 65
57 bool isInitialised(int32_t h) { 66 bool isConfigured(int32_t h) {
58 return m_initialisedPlugins.find(h) != m_initialisedPlugins.end(); 67 return m_configuredPlugins.find(h) != m_configuredPlugins.end();
59 } 68 }
60 69
61 void markInitialised(int32_t h) { 70 void markConfigured(int32_t h) {
62 m_initialisedPlugins.insert(h); 71 m_configuredPlugins.insert(h);
63 } 72 }
64 73
65 private: 74 private:
66 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number 75 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number
67 map<uint32_t, Plugin *> m_plugins; 76 map<uint32_t, Plugin *> m_plugins;
68 map<Plugin *, uint32_t> m_rplugins; 77 map<Plugin *, uint32_t> m_rplugins;
69 set<uint32_t> m_initialisedPlugins; 78 set<uint32_t> m_configuredPlugins;
70 }; 79 };
71 80
72 static Mapper mapper; 81 static Mapper mapper;
73 82
74 RequestOrResponse 83 RequestOrResponse
75 readRequestCapnp() 84 readRequestCapnp()
76 { 85 {
77 RequestOrResponse rr; 86 RequestOrResponse rr;
78 rr.direction = RequestOrResponse::Request; 87 rr.direction = RequestOrResponse::Request;
79 88
80 ::capnp::PackedFdMessageReader message(0); // stdin 89 static kj::FdInputStream stream(0); // stdin
90 static kj::BufferedInputStreamWrapper buffered(stream);
91
92 if (buffered.tryGetReadBuffer() == nullptr) {
93 rr.type = RRType::NotValid;
94 return rr;
95 }
96
97 ::capnp::InputStreamMessageReader message(buffered);
81 VampRequest::Reader reader = message.getRoot<VampRequest>(); 98 VampRequest::Reader reader = message.getRoot<VampRequest>();
82 99
83 rr.type = VampnProto::getRequestResponseType(reader); 100 rr.type = VampnProto::getRequestResponseType(reader);
84 101
85 switch (rr.type) { 102 switch (rr.type) {
132 break; 149 break;
133 case RRType::NotValid: 150 case RRType::NotValid:
134 break; 151 break;
135 } 152 }
136 153
137 writePackedMessageToFd(1, message); 154 writeMessageToFd(1, message);
138 } 155 }
139 156
140 RequestOrResponse 157 RequestOrResponse
141 processRequest(const RequestOrResponse &request) 158 processRequest(const RequestOrResponse &request)
142 { 159 {
159 mapper.addPlugin(response.loadResponse.plugin); 176 mapper.addPlugin(response.loadResponse.plugin);
160 response.success = true; 177 response.success = true;
161 } 178 }
162 break; 179 break;
163 180
164 default: 181 case RRType::Configure:
165 //!!! 182 {
166 ; 183 auto h = mapper.pluginToHandle(request.configurationRequest.plugin);
184 if (mapper.isConfigured(h)) {
185 throw runtime_error("plugin has already been configured");
186 }
187
188 response.configurationResponse =
189 loader->configurePlugin(request.configurationRequest);
190
191 if (!response.configurationResponse.outputs.empty()) {
192 mapper.markConfigured(h);
193 response.success = true;
194 }
195 break;
196 }
197
198 case RRType::Process:
199 {
200 auto &preq = request.processRequest;
201 int channels = int(preq.inputBuffers.size());
202 const float **fbuffers = new const float *[channels];
203 for (int i = 0; i < channels; ++i) {
204 fbuffers[i] = preq.inputBuffers[i].data();
205 }
206
207 response.processResponse.features =
208 preq.plugin->process(fbuffers, preq.timestamp);
209 response.success = true;
210
211 delete[] fbuffers;
212 break;
213 }
214
215 case RRType::Finish:
216 {
217 auto h = mapper.pluginToHandle(request.finishPlugin);
218
219 response.finishResponse.features =
220 request.finishPlugin->getRemainingFeatures();
221
222 mapper.removePlugin(h);
223 delete request.finishPlugin;
224 response.success = true;
225 break;
226 }
227
228 case RRType::NotValid:
229 break;
167 } 230 }
168 231
169 return response; 232 return response;
170 } 233 }
171 234
179 242
180 try { 243 try {
181 244
182 RequestOrResponse request = readRequestCapnp(); 245 RequestOrResponse request = readRequestCapnp();
183 246
247 cerr << "vampipe-server: request received, of type "
248 << int(request.type)
249 << endl;
250
184 // NotValid without an exception indicates EOF: 251 // NotValid without an exception indicates EOF:
185 252 if (request.type == RRType::NotValid) {
186 //!!! not yet it doesn't -- have to figure out how to 253 cerr << "vampipe-server: eof reached" << endl;
187 //!!! handle this with capnp 254 break;
188 if (request.type == RRType::NotValid) break; 255 }
189 256
190 RequestOrResponse response = processRequest(request); 257 RequestOrResponse response = processRequest(request);
258
259 cerr << "vampipe-server: request processed, writing response"
260 << endl;
191 261
192 writeResponseCapnp(response); 262 writeResponseCapnp(response);
263
264 cerr << "vampipe-server: response written" << endl;
193 265
194 } catch (std::exception &e) { 266 } catch (std::exception &e) {
195 cerr << "Error: " << e.what() << endl; 267 cerr << "vampipe-server: error: " << e.what() << endl;
196 exit(1); 268 exit(1);
197 } 269 }
198 } 270 }
199 271
200 exit(0); 272 exit(0);