cannam@62
|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
|
cannam@62
|
2 // Licensed under the MIT License:
|
cannam@62
|
3 //
|
cannam@62
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
cannam@62
|
5 // of this software and associated documentation files (the "Software"), to deal
|
cannam@62
|
6 // in the Software without restriction, including without limitation the rights
|
cannam@62
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
cannam@62
|
8 // copies of the Software, and to permit persons to whom the Software is
|
cannam@62
|
9 // furnished to do so, subject to the following conditions:
|
cannam@62
|
10 //
|
cannam@62
|
11 // The above copyright notice and this permission notice shall be included in
|
cannam@62
|
12 // all copies or substantial portions of the Software.
|
cannam@62
|
13 //
|
cannam@62
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
cannam@62
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
cannam@62
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
cannam@62
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
cannam@62
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
cannam@62
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
cannam@62
|
20 // THE SOFTWARE.
|
cannam@62
|
21
|
cannam@62
|
22 #include "rpc-twoparty.h"
|
cannam@62
|
23 #include "test-util.h"
|
cannam@62
|
24 #include <capnp/rpc.capnp.h>
|
cannam@62
|
25 #include <kj/debug.h>
|
cannam@62
|
26 #include <kj/thread.h>
|
cannam@62
|
27 #include <kj/compat/gtest.h>
|
cannam@62
|
28
|
cannam@62
|
29 // TODO(cleanup): Auto-generate stringification functions for union discriminants.
|
cannam@62
|
30 namespace capnp {
|
cannam@62
|
31 namespace rpc {
|
cannam@62
|
32 inline kj::String KJ_STRINGIFY(Message::Which which) {
|
cannam@62
|
33 return kj::str(static_cast<uint16_t>(which));
|
cannam@62
|
34 }
|
cannam@62
|
35 } // namespace rpc
|
cannam@62
|
36 } // namespace capnp
|
cannam@62
|
37
|
cannam@62
|
38 namespace capnp {
|
cannam@62
|
39 namespace _ {
|
cannam@62
|
40 namespace {
|
cannam@62
|
41
|
cannam@62
|
42 class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
|
cannam@62
|
43 public:
|
cannam@62
|
44 TestRestorer(int& callCount, int& handleCount)
|
cannam@62
|
45 : callCount(callCount), handleCount(handleCount) {}
|
cannam@62
|
46
|
cannam@62
|
47 Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
|
cannam@62
|
48 switch (objectId.getTag()) {
|
cannam@62
|
49 case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE:
|
cannam@62
|
50 return kj::heap<TestInterfaceImpl>(callCount);
|
cannam@62
|
51 case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS:
|
cannam@62
|
52 return Capability::Client(newBrokenCap("No TestExtends implemented."));
|
cannam@62
|
53 case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE:
|
cannam@62
|
54 return kj::heap<TestPipelineImpl>(callCount);
|
cannam@62
|
55 case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLEE:
|
cannam@62
|
56 return kj::heap<TestTailCalleeImpl>(callCount);
|
cannam@62
|
57 case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER:
|
cannam@62
|
58 return kj::heap<TestTailCallerImpl>(callCount);
|
cannam@62
|
59 case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF:
|
cannam@62
|
60 return kj::heap<TestMoreStuffImpl>(callCount, handleCount);
|
cannam@62
|
61 }
|
cannam@62
|
62 KJ_UNREACHABLE;
|
cannam@62
|
63 }
|
cannam@62
|
64
|
cannam@62
|
65 private:
|
cannam@62
|
66 int& callCount;
|
cannam@62
|
67 int& handleCount;
|
cannam@62
|
68 };
|
cannam@62
|
69
|
cannam@62
|
70 kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider,
|
cannam@62
|
71 int& callCount, int& handleCount) {
|
cannam@62
|
72 return ioProvider.newPipeThread(
|
cannam@62
|
73 [&callCount, &handleCount](
|
cannam@62
|
74 kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) {
|
cannam@62
|
75 TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER);
|
cannam@62
|
76 TestRestorer restorer(callCount, handleCount);
|
cannam@62
|
77 auto server = makeRpcServer(network, restorer);
|
cannam@62
|
78 network.onDisconnect().wait(waitScope);
|
cannam@62
|
79 });
|
cannam@62
|
80 }
|
cannam@62
|
81
|
cannam@62
|
82 Capability::Client getPersistentCap(RpcSystem<rpc::twoparty::VatId>& client,
|
cannam@62
|
83 rpc::twoparty::Side side,
|
cannam@62
|
84 test::TestSturdyRefObjectId::Tag tag) {
|
cannam@62
|
85 // Create the VatId.
|
cannam@62
|
86 MallocMessageBuilder hostIdMessage(8);
|
cannam@62
|
87 auto hostId = hostIdMessage.initRoot<rpc::twoparty::VatId>();
|
cannam@62
|
88 hostId.setSide(side);
|
cannam@62
|
89
|
cannam@62
|
90 // Create the SturdyRefObjectId.
|
cannam@62
|
91 MallocMessageBuilder objectIdMessage(8);
|
cannam@62
|
92 objectIdMessage.initRoot<test::TestSturdyRefObjectId>().setTag(tag);
|
cannam@62
|
93
|
cannam@62
|
94 // Connect to the remote capability.
|
cannam@62
|
95 return client.restore(hostId, objectIdMessage.getRoot<AnyPointer>());
|
cannam@62
|
96 }
|
cannam@62
|
97
|
cannam@62
|
98 TEST(TwoPartyNetwork, Basic) {
|
cannam@62
|
99 auto ioContext = kj::setupAsyncIo();
|
cannam@62
|
100 int callCount = 0;
|
cannam@62
|
101 int handleCount = 0;
|
cannam@62
|
102
|
cannam@62
|
103 auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
|
cannam@62
|
104 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
|
cannam@62
|
105 auto rpcClient = makeRpcClient(network);
|
cannam@62
|
106
|
cannam@62
|
107 // Request the particular capability from the server.
|
cannam@62
|
108 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
|
cannam@62
|
109 test::TestSturdyRefObjectId::Tag::TEST_INTERFACE).castAs<test::TestInterface>();
|
cannam@62
|
110
|
cannam@62
|
111 // Use the capability.
|
cannam@62
|
112 auto request1 = client.fooRequest();
|
cannam@62
|
113 request1.setI(123);
|
cannam@62
|
114 request1.setJ(true);
|
cannam@62
|
115 auto promise1 = request1.send();
|
cannam@62
|
116
|
cannam@62
|
117 auto request2 = client.bazRequest();
|
cannam@62
|
118 initTestMessage(request2.initS());
|
cannam@62
|
119 auto promise2 = request2.send();
|
cannam@62
|
120
|
cannam@62
|
121 bool barFailed = false;
|
cannam@62
|
122 auto request3 = client.barRequest();
|
cannam@62
|
123 auto promise3 = request3.send().then(
|
cannam@62
|
124 [](Response<test::TestInterface::BarResults>&& response) {
|
cannam@62
|
125 ADD_FAILURE() << "Expected bar() call to fail.";
|
cannam@62
|
126 }, [&](kj::Exception&& e) {
|
cannam@62
|
127 barFailed = true;
|
cannam@62
|
128 });
|
cannam@62
|
129
|
cannam@62
|
130 EXPECT_EQ(0, callCount);
|
cannam@62
|
131
|
cannam@62
|
132 auto response1 = promise1.wait(ioContext.waitScope);
|
cannam@62
|
133
|
cannam@62
|
134 EXPECT_EQ("foo", response1.getX());
|
cannam@62
|
135
|
cannam@62
|
136 auto response2 = promise2.wait(ioContext.waitScope);
|
cannam@62
|
137
|
cannam@62
|
138 promise3.wait(ioContext.waitScope);
|
cannam@62
|
139
|
cannam@62
|
140 EXPECT_EQ(2, callCount);
|
cannam@62
|
141 EXPECT_TRUE(barFailed);
|
cannam@62
|
142 }
|
cannam@62
|
143
|
cannam@62
|
144 TEST(TwoPartyNetwork, Pipelining) {
|
cannam@62
|
145 auto ioContext = kj::setupAsyncIo();
|
cannam@62
|
146 int callCount = 0;
|
cannam@62
|
147 int handleCount = 0;
|
cannam@62
|
148 int reverseCallCount = 0; // Calls back from server to client.
|
cannam@62
|
149
|
cannam@62
|
150 auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
|
cannam@62
|
151 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
|
cannam@62
|
152 auto rpcClient = makeRpcClient(network);
|
cannam@62
|
153
|
cannam@62
|
154 bool disconnected = false;
|
cannam@62
|
155 kj::Promise<void> disconnectPromise = network.onDisconnect().then([&]() { disconnected = true; });
|
cannam@62
|
156
|
cannam@62
|
157 {
|
cannam@62
|
158 // Request the particular capability from the server.
|
cannam@62
|
159 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
|
cannam@62
|
160 test::TestSturdyRefObjectId::Tag::TEST_PIPELINE).castAs<test::TestPipeline>();
|
cannam@62
|
161
|
cannam@62
|
162 {
|
cannam@62
|
163 // Use the capability.
|
cannam@62
|
164 auto request = client.getCapRequest();
|
cannam@62
|
165 request.setN(234);
|
cannam@62
|
166 request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount));
|
cannam@62
|
167
|
cannam@62
|
168 auto promise = request.send();
|
cannam@62
|
169
|
cannam@62
|
170 auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
|
cannam@62
|
171 pipelineRequest.setI(321);
|
cannam@62
|
172 auto pipelinePromise = pipelineRequest.send();
|
cannam@62
|
173
|
cannam@62
|
174 auto pipelineRequest2 = promise.getOutBox().getCap()
|
cannam@62
|
175 .castAs<test::TestExtends>().graultRequest();
|
cannam@62
|
176 auto pipelinePromise2 = pipelineRequest2.send();
|
cannam@62
|
177
|
cannam@62
|
178 promise = nullptr; // Just to be annoying, drop the original promise.
|
cannam@62
|
179
|
cannam@62
|
180 EXPECT_EQ(0, callCount);
|
cannam@62
|
181 EXPECT_EQ(0, reverseCallCount);
|
cannam@62
|
182
|
cannam@62
|
183 auto response = pipelinePromise.wait(ioContext.waitScope);
|
cannam@62
|
184 EXPECT_EQ("bar", response.getX());
|
cannam@62
|
185
|
cannam@62
|
186 auto response2 = pipelinePromise2.wait(ioContext.waitScope);
|
cannam@62
|
187 checkTestMessage(response2);
|
cannam@62
|
188
|
cannam@62
|
189 EXPECT_EQ(3, callCount);
|
cannam@62
|
190 EXPECT_EQ(1, reverseCallCount);
|
cannam@62
|
191 }
|
cannam@62
|
192
|
cannam@62
|
193 EXPECT_FALSE(disconnected);
|
cannam@62
|
194
|
cannam@62
|
195 // What if we disconnect?
|
cannam@62
|
196 serverThread.pipe->shutdownWrite();
|
cannam@62
|
197
|
cannam@62
|
198 // The other side should also disconnect.
|
cannam@62
|
199 disconnectPromise.wait(ioContext.waitScope);
|
cannam@62
|
200
|
cannam@62
|
201 {
|
cannam@62
|
202 // Use the now-broken capability.
|
cannam@62
|
203 auto request = client.getCapRequest();
|
cannam@62
|
204 request.setN(234);
|
cannam@62
|
205 request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount));
|
cannam@62
|
206
|
cannam@62
|
207 auto promise = request.send();
|
cannam@62
|
208
|
cannam@62
|
209 auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
|
cannam@62
|
210 pipelineRequest.setI(321);
|
cannam@62
|
211 auto pipelinePromise = pipelineRequest.send();
|
cannam@62
|
212
|
cannam@62
|
213 auto pipelineRequest2 = promise.getOutBox().getCap()
|
cannam@62
|
214 .castAs<test::TestExtends>().graultRequest();
|
cannam@62
|
215 auto pipelinePromise2 = pipelineRequest2.send();
|
cannam@62
|
216
|
cannam@62
|
217 EXPECT_ANY_THROW(pipelinePromise.wait(ioContext.waitScope));
|
cannam@62
|
218 EXPECT_ANY_THROW(pipelinePromise2.wait(ioContext.waitScope));
|
cannam@62
|
219
|
cannam@62
|
220 EXPECT_EQ(3, callCount);
|
cannam@62
|
221 EXPECT_EQ(1, reverseCallCount);
|
cannam@62
|
222 }
|
cannam@62
|
223 }
|
cannam@62
|
224 }
|
cannam@62
|
225
|
cannam@62
|
226 TEST(TwoPartyNetwork, Release) {
|
cannam@62
|
227 auto ioContext = kj::setupAsyncIo();
|
cannam@62
|
228 int callCount = 0;
|
cannam@62
|
229 int handleCount = 0;
|
cannam@62
|
230
|
cannam@62
|
231 auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
|
cannam@62
|
232 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
|
cannam@62
|
233 auto rpcClient = makeRpcClient(network);
|
cannam@62
|
234
|
cannam@62
|
235 // Request the particular capability from the server.
|
cannam@62
|
236 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
|
cannam@62
|
237 test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>();
|
cannam@62
|
238
|
cannam@62
|
239 auto handle1 = client.getHandleRequest().send().wait(ioContext.waitScope).getHandle();
|
cannam@62
|
240 auto promise = client.getHandleRequest().send();
|
cannam@62
|
241 auto handle2 = promise.wait(ioContext.waitScope).getHandle();
|
cannam@62
|
242
|
cannam@62
|
243 EXPECT_EQ(2, handleCount);
|
cannam@62
|
244
|
cannam@62
|
245 handle1 = nullptr;
|
cannam@62
|
246
|
cannam@62
|
247 // There once was a bug where the last outgoing message (and any capabilities attached) would
|
cannam@62
|
248 // not get cleaned up (until a new message was sent). This appeared to be a bug in Release,
|
cannam@62
|
249 // becaues if a client received a message and then released a capability from it but then did
|
cannam@62
|
250 // not make any further calls, then the capability would not be released because the message
|
cannam@62
|
251 // introducing it remained the last server -> client message (because a "Release" message has
|
cannam@62
|
252 // no reply). Here we are explicitly trying to catch this bug. This proves tricky, because when
|
cannam@62
|
253 // we drop a reference on the client side, there's no particular way to wait for the release
|
cannam@62
|
254 // message to reach the server except to make a subsequent call and wait for the return -- but
|
cannam@62
|
255 // that would mask the bug. So, we wait spin waiting for handleCount to change.
|
cannam@62
|
256
|
cannam@62
|
257 uint maxSpins = 1000;
|
cannam@62
|
258
|
cannam@62
|
259 while (handleCount > 1) {
|
cannam@62
|
260 ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
|
cannam@62
|
261 KJ_ASSERT(--maxSpins > 0);
|
cannam@62
|
262 }
|
cannam@62
|
263 EXPECT_EQ(1, handleCount);
|
cannam@62
|
264
|
cannam@62
|
265 handle2 = nullptr;
|
cannam@62
|
266
|
cannam@62
|
267 ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
|
cannam@62
|
268 EXPECT_EQ(1, handleCount);
|
cannam@62
|
269
|
cannam@62
|
270 promise = nullptr;
|
cannam@62
|
271
|
cannam@62
|
272 while (handleCount > 0) {
|
cannam@62
|
273 ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
|
cannam@62
|
274 KJ_ASSERT(--maxSpins > 0);
|
cannam@62
|
275 }
|
cannam@62
|
276 EXPECT_EQ(0, handleCount);
|
cannam@62
|
277 }
|
cannam@62
|
278
|
cannam@62
|
279 TEST(TwoPartyNetwork, Abort) {
|
cannam@62
|
280 // Verify that aborts are received.
|
cannam@62
|
281
|
cannam@62
|
282 auto ioContext = kj::setupAsyncIo();
|
cannam@62
|
283 int callCount = 0;
|
cannam@62
|
284 int handleCount = 0;
|
cannam@62
|
285
|
cannam@62
|
286 auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
|
cannam@62
|
287 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
|
cannam@62
|
288
|
cannam@62
|
289 MallocMessageBuilder refMessage(128);
|
cannam@62
|
290 auto hostId = refMessage.initRoot<rpc::twoparty::VatId>();
|
cannam@62
|
291 hostId.setSide(rpc::twoparty::Side::SERVER);
|
cannam@62
|
292
|
cannam@62
|
293 auto conn = KJ_ASSERT_NONNULL(network.connect(hostId));
|
cannam@62
|
294
|
cannam@62
|
295 {
|
cannam@62
|
296 // Send an invalid message (Return to non-existent question).
|
cannam@62
|
297 auto msg = conn->newOutgoingMessage(128);
|
cannam@62
|
298 auto body = msg->getBody().initAs<rpc::Message>().initReturn();
|
cannam@62
|
299 body.setAnswerId(1234);
|
cannam@62
|
300 body.setCanceled();
|
cannam@62
|
301 msg->send();
|
cannam@62
|
302 }
|
cannam@62
|
303
|
cannam@62
|
304 auto reply = KJ_ASSERT_NONNULL(conn->receiveIncomingMessage().wait(ioContext.waitScope));
|
cannam@62
|
305 EXPECT_EQ(rpc::Message::ABORT, reply->getBody().getAs<rpc::Message>().which());
|
cannam@62
|
306
|
cannam@62
|
307 EXPECT_TRUE(conn->receiveIncomingMessage().wait(ioContext.waitScope) == nullptr);
|
cannam@62
|
308 }
|
cannam@62
|
309
|
cannam@62
|
310 TEST(TwoPartyNetwork, ConvenienceClasses) {
|
cannam@62
|
311 auto ioContext = kj::setupAsyncIo();
|
cannam@62
|
312
|
cannam@62
|
313 int callCount = 0;
|
cannam@62
|
314 TwoPartyServer server(kj::heap<TestInterfaceImpl>(callCount));
|
cannam@62
|
315
|
cannam@62
|
316 auto address = ioContext.provider->getNetwork()
|
cannam@62
|
317 .parseAddress("127.0.0.1").wait(ioContext.waitScope);
|
cannam@62
|
318
|
cannam@62
|
319 auto listener = address->listen();
|
cannam@62
|
320 auto listenPromise = server.listen(*listener);
|
cannam@62
|
321
|
cannam@62
|
322 address = ioContext.provider->getNetwork()
|
cannam@62
|
323 .parseAddress("127.0.0.1", listener->getPort()).wait(ioContext.waitScope);
|
cannam@62
|
324
|
cannam@62
|
325 auto connection = address->connect().wait(ioContext.waitScope);
|
cannam@62
|
326 TwoPartyClient client(*connection);
|
cannam@62
|
327 auto cap = client.bootstrap().castAs<test::TestInterface>();
|
cannam@62
|
328
|
cannam@62
|
329 auto request = cap.fooRequest();
|
cannam@62
|
330 request.setI(123);
|
cannam@62
|
331 request.setJ(true);
|
cannam@62
|
332 EXPECT_EQ(0, callCount);
|
cannam@62
|
333 auto response = request.send().wait(ioContext.waitScope);
|
cannam@62
|
334 EXPECT_EQ("foo", response.getX());
|
cannam@62
|
335 EXPECT_EQ(1, callCount);
|
cannam@62
|
336 }
|
cannam@62
|
337
|
cannam@62
|
338 TEST(TwoPartyNetwork, HugeMessage) {
|
cannam@62
|
339 auto ioContext = kj::setupAsyncIo();
|
cannam@62
|
340 int callCount = 0;
|
cannam@62
|
341 int handleCount = 0;
|
cannam@62
|
342
|
cannam@62
|
343 auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
|
cannam@62
|
344 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
|
cannam@62
|
345 auto rpcClient = makeRpcClient(network);
|
cannam@62
|
346
|
cannam@62
|
347 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
|
cannam@62
|
348 test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>();
|
cannam@62
|
349
|
cannam@62
|
350 // Oversized request fails.
|
cannam@62
|
351 {
|
cannam@62
|
352 auto req = client.methodWithDefaultsRequest();
|
cannam@62
|
353 req.initA(100000000); // 100 MB
|
cannam@62
|
354
|
cannam@62
|
355 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than the single-message size limit",
|
cannam@62
|
356 req.send().ignoreResult().wait(ioContext.waitScope));
|
cannam@62
|
357 }
|
cannam@62
|
358
|
cannam@62
|
359 // Oversized response fails.
|
cannam@62
|
360 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than the single-message size limit",
|
cannam@62
|
361 client.getEnormousStringRequest().send().ignoreResult().wait(ioContext.waitScope));
|
cannam@62
|
362
|
cannam@62
|
363 // Connection is still up.
|
cannam@62
|
364 {
|
cannam@62
|
365 auto req = client.getCallSequenceRequest();
|
cannam@62
|
366 req.setExpected(0);
|
cannam@62
|
367 KJ_EXPECT(req.send().wait(ioContext.waitScope).getN() == 0);
|
cannam@62
|
368 }
|
cannam@62
|
369 }
|
cannam@62
|
370
|
cannam@62
|
371 class TestAuthenticatedBootstrapImpl final
|
cannam@62
|
372 : public test::TestAuthenticatedBootstrap<rpc::twoparty::VatId>::Server {
|
cannam@62
|
373 public:
|
cannam@62
|
374 TestAuthenticatedBootstrapImpl(rpc::twoparty::VatId::Reader clientId) {
|
cannam@62
|
375 this->clientId.setRoot(clientId);
|
cannam@62
|
376 }
|
cannam@62
|
377
|
cannam@62
|
378 protected:
|
cannam@62
|
379 kj::Promise<void> getCallerId(GetCallerIdContext context) override {
|
cannam@62
|
380 context.getResults().setCaller(clientId.getRoot<rpc::twoparty::VatId>());
|
cannam@62
|
381 return kj::READY_NOW;
|
cannam@62
|
382 }
|
cannam@62
|
383
|
cannam@62
|
384 private:
|
cannam@62
|
385 MallocMessageBuilder clientId;
|
cannam@62
|
386 };
|
cannam@62
|
387
|
cannam@62
|
388 class TestBootstrapFactory: public BootstrapFactory<rpc::twoparty::VatId> {
|
cannam@62
|
389 public:
|
cannam@62
|
390 Capability::Client createFor(rpc::twoparty::VatId::Reader clientId) {
|
cannam@62
|
391 called = true;
|
cannam@62
|
392 EXPECT_EQ(rpc::twoparty::Side::CLIENT, clientId.getSide());
|
cannam@62
|
393 return kj::heap<TestAuthenticatedBootstrapImpl>(clientId);
|
cannam@62
|
394 }
|
cannam@62
|
395
|
cannam@62
|
396 bool called = false;
|
cannam@62
|
397 };
|
cannam@62
|
398
|
cannam@62
|
399 kj::AsyncIoProvider::PipeThread runAuthenticatingServer(
|
cannam@62
|
400 kj::AsyncIoProvider& ioProvider, BootstrapFactory<rpc::twoparty::VatId>& bootstrapFactory) {
|
cannam@62
|
401 return ioProvider.newPipeThread([&bootstrapFactory](
|
cannam@62
|
402 kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) {
|
cannam@62
|
403 TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER);
|
cannam@62
|
404 auto server = makeRpcServer(network, bootstrapFactory);
|
cannam@62
|
405 network.onDisconnect().wait(waitScope);
|
cannam@62
|
406 });
|
cannam@62
|
407 }
|
cannam@62
|
408
|
cannam@62
|
409 TEST(TwoPartyNetwork, BootstrapFactory) {
|
cannam@62
|
410 auto ioContext = kj::setupAsyncIo();
|
cannam@62
|
411 TestBootstrapFactory bootstrapFactory;
|
cannam@62
|
412 auto serverThread = runAuthenticatingServer(*ioContext.provider, bootstrapFactory);
|
cannam@62
|
413 TwoPartyClient client(*serverThread.pipe);
|
cannam@62
|
414 auto resp = client.bootstrap().castAs<test::TestAuthenticatedBootstrap<rpc::twoparty::VatId>>()
|
cannam@62
|
415 .getCallerIdRequest().send().wait(ioContext.waitScope);
|
cannam@62
|
416 EXPECT_EQ(rpc::twoparty::Side::CLIENT, resp.getCaller().getSide());
|
cannam@62
|
417 EXPECT_TRUE(bootstrapFactory.called);
|
cannam@62
|
418 }
|
cannam@62
|
419
|
cannam@62
|
420 } // namespace
|
cannam@62
|
421 } // namespace _
|
cannam@62
|
422 } // namespace capnp
|