Mercurial > hg > piper-cpp
comparison utilities/vampipe-server.cpp @ 33:0b48b10140bb
Switch to non-packed protocol and handle multiple messages and EOF properly; fill in remaining server actions
author | Chris Cannam <c.cannam@qmul.ac.uk> |
---|---|
date | Wed, 25 May 2016 10:43:07 +0100 |
parents | 2d97883d20df |
children | ba58fe5ee2dd |
comparison
equal
deleted
inserted
replaced
32:2d97883d20df | 33:0b48b10140bb |
---|---|
37 int32_t h = m_nextHandle++; | 37 int32_t h = m_nextHandle++; |
38 m_plugins[h] = p; | 38 m_plugins[h] = p; |
39 m_rplugins[p] = h; | 39 m_rplugins[p] = h; |
40 } | 40 } |
41 } | 41 } |
42 | |
43 void removePlugin(int32_t h) { | |
44 if (m_plugins.find(h) == m_plugins.end()) { | |
45 throw NotFound(); | |
46 } | |
47 Plugin *p = m_plugins[h]; | |
48 m_plugins.erase(h); | |
49 m_rplugins.erase(p); | |
50 } | |
42 | 51 |
43 int32_t pluginToHandle(Plugin *p) { | 52 int32_t pluginToHandle(Plugin *p) { |
44 if (m_rplugins.find(p) == m_rplugins.end()) { | 53 if (m_rplugins.find(p) == m_rplugins.end()) { |
45 throw NotFound(); | 54 throw NotFound(); |
46 } | 55 } |
52 throw NotFound(); | 61 throw NotFound(); |
53 } | 62 } |
54 return m_plugins[h]; | 63 return m_plugins[h]; |
55 } | 64 } |
56 | 65 |
57 bool isInitialised(int32_t h) { | 66 bool isConfigured(int32_t h) { |
58 return m_initialisedPlugins.find(h) != m_initialisedPlugins.end(); | 67 return m_configuredPlugins.find(h) != m_configuredPlugins.end(); |
59 } | 68 } |
60 | 69 |
61 void markInitialised(int32_t h) { | 70 void markConfigured(int32_t h) { |
62 m_initialisedPlugins.insert(h); | 71 m_configuredPlugins.insert(h); |
63 } | 72 } |
64 | 73 |
65 private: | 74 private: |
66 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number | 75 int32_t m_nextHandle; // NB plugin handle type must fit in JSON number |
67 map<uint32_t, Plugin *> m_plugins; | 76 map<uint32_t, Plugin *> m_plugins; |
68 map<Plugin *, uint32_t> m_rplugins; | 77 map<Plugin *, uint32_t> m_rplugins; |
69 set<uint32_t> m_initialisedPlugins; | 78 set<uint32_t> m_configuredPlugins; |
70 }; | 79 }; |
71 | 80 |
72 static Mapper mapper; | 81 static Mapper mapper; |
73 | 82 |
74 RequestOrResponse | 83 RequestOrResponse |
75 readRequestCapnp() | 84 readRequestCapnp() |
76 { | 85 { |
77 RequestOrResponse rr; | 86 RequestOrResponse rr; |
78 rr.direction = RequestOrResponse::Request; | 87 rr.direction = RequestOrResponse::Request; |
79 | 88 |
80 ::capnp::PackedFdMessageReader message(0); // stdin | 89 static kj::FdInputStream stream(0); // stdin |
90 static kj::BufferedInputStreamWrapper buffered(stream); | |
91 | |
92 if (buffered.tryGetReadBuffer() == nullptr) { | |
93 rr.type = RRType::NotValid; | |
94 return rr; | |
95 } | |
96 | |
97 ::capnp::InputStreamMessageReader message(buffered); | |
81 VampRequest::Reader reader = message.getRoot<VampRequest>(); | 98 VampRequest::Reader reader = message.getRoot<VampRequest>(); |
82 | 99 |
83 rr.type = VampnProto::getRequestResponseType(reader); | 100 rr.type = VampnProto::getRequestResponseType(reader); |
84 | 101 |
85 switch (rr.type) { | 102 switch (rr.type) { |
132 break; | 149 break; |
133 case RRType::NotValid: | 150 case RRType::NotValid: |
134 break; | 151 break; |
135 } | 152 } |
136 | 153 |
137 writePackedMessageToFd(1, message); | 154 writeMessageToFd(1, message); |
138 } | 155 } |
139 | 156 |
140 RequestOrResponse | 157 RequestOrResponse |
141 processRequest(const RequestOrResponse &request) | 158 processRequest(const RequestOrResponse &request) |
142 { | 159 { |
159 mapper.addPlugin(response.loadResponse.plugin); | 176 mapper.addPlugin(response.loadResponse.plugin); |
160 response.success = true; | 177 response.success = true; |
161 } | 178 } |
162 break; | 179 break; |
163 | 180 |
164 default: | 181 case RRType::Configure: |
165 //!!! | 182 { |
166 ; | 183 auto h = mapper.pluginToHandle(request.configurationRequest.plugin); |
184 if (mapper.isConfigured(h)) { | |
185 throw runtime_error("plugin has already been configured"); | |
186 } | |
187 | |
188 response.configurationResponse = | |
189 loader->configurePlugin(request.configurationRequest); | |
190 | |
191 if (!response.configurationResponse.outputs.empty()) { | |
192 mapper.markConfigured(h); | |
193 response.success = true; | |
194 } | |
195 break; | |
196 } | |
197 | |
198 case RRType::Process: | |
199 { | |
200 auto &preq = request.processRequest; | |
201 int channels = int(preq.inputBuffers.size()); | |
202 const float **fbuffers = new const float *[channels]; | |
203 for (int i = 0; i < channels; ++i) { | |
204 fbuffers[i] = preq.inputBuffers[i].data(); | |
205 } | |
206 | |
207 response.processResponse.features = | |
208 preq.plugin->process(fbuffers, preq.timestamp); | |
209 response.success = true; | |
210 | |
211 delete[] fbuffers; | |
212 break; | |
213 } | |
214 | |
215 case RRType::Finish: | |
216 { | |
217 auto h = mapper.pluginToHandle(request.finishPlugin); | |
218 | |
219 response.finishResponse.features = | |
220 request.finishPlugin->getRemainingFeatures(); | |
221 | |
222 mapper.removePlugin(h); | |
223 delete request.finishPlugin; | |
224 response.success = true; | |
225 break; | |
226 } | |
227 | |
228 case RRType::NotValid: | |
229 break; | |
167 } | 230 } |
168 | 231 |
169 return response; | 232 return response; |
170 } | 233 } |
171 | 234 |
179 | 242 |
180 try { | 243 try { |
181 | 244 |
182 RequestOrResponse request = readRequestCapnp(); | 245 RequestOrResponse request = readRequestCapnp(); |
183 | 246 |
247 cerr << "vampipe-server: request received, of type " | |
248 << int(request.type) | |
249 << endl; | |
250 | |
184 // NotValid without an exception indicates EOF: | 251 // NotValid without an exception indicates EOF: |
185 | 252 if (request.type == RRType::NotValid) { |
186 //!!! not yet it doesn't -- have to figure out how to | 253 cerr << "vampipe-server: eof reached" << endl; |
187 //!!! handle this with capnp | 254 break; |
188 if (request.type == RRType::NotValid) break; | 255 } |
189 | 256 |
190 RequestOrResponse response = processRequest(request); | 257 RequestOrResponse response = processRequest(request); |
258 | |
259 cerr << "vampipe-server: request processed, writing response" | |
260 << endl; | |
191 | 261 |
192 writeResponseCapnp(response); | 262 writeResponseCapnp(response); |
263 | |
264 cerr << "vampipe-server: response written" << endl; | |
193 | 265 |
194 } catch (std::exception &e) { | 266 } catch (std::exception &e) { |
195 cerr << "Error: " << e.what() << endl; | 267 cerr << "vampipe-server: error: " << e.what() << endl; |
196 exit(1); | 268 exit(1); |
197 } | 269 } |
198 } | 270 } |
199 | 271 |
200 exit(0); | 272 exit(0); |