Mercurial > hg > sv-dependency-builds
comparison src/capnproto-0.6.0/c++/src/kj/async-io-test.c++ @ 62:0994c39f1e94
Cap'n Proto v0.6 + build for OSX
author | Chris Cannam <cannam@all-day-breakfast.com> |
---|---|
date | Mon, 22 May 2017 10:01:37 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
61:d101c4099725 | 62:0994c39f1e94 |
---|---|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
2 // Licensed under the MIT License: | |
3 // | |
4 // Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 // of this software and associated documentation files (the "Software"), to deal | |
6 // in the Software without restriction, including without limitation the rights | |
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 // copies of the Software, and to permit persons to whom the Software is | |
9 // furnished to do so, subject to the following conditions: | |
10 // | |
11 // The above copyright notice and this permission notice shall be included in | |
12 // all copies or substantial portions of the Software. | |
13 // | |
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
20 // THE SOFTWARE. | |
21 | |
22 #include "async-io.h" | |
23 #include "debug.h" | |
24 #include <kj/compat/gtest.h> | |
25 #include <sys/types.h> | |
26 #if _WIN32 | |
27 #include <ws2tcpip.h> | |
28 #include "windows-sanity.h" | |
29 #else | |
30 #include <netdb.h> | |
31 #endif | |
32 | |
33 namespace kj { | |
34 namespace { | |
35 | |
36 TEST(AsyncIo, SimpleNetwork) { | |
37 auto ioContext = setupAsyncIo(); | |
38 auto& network = ioContext.provider->getNetwork(); | |
39 | |
40 Own<ConnectionReceiver> listener; | |
41 Own<AsyncIoStream> server; | |
42 Own<AsyncIoStream> client; | |
43 | |
44 char receiveBuffer[4]; | |
45 | |
46 auto port = newPromiseAndFulfiller<uint>(); | |
47 | |
48 port.promise.then([&](uint portnum) { | |
49 return network.parseAddress("localhost", portnum); | |
50 }).then([&](Own<NetworkAddress>&& result) { | |
51 return result->connect(); | |
52 }).then([&](Own<AsyncIoStream>&& result) { | |
53 client = kj::mv(result); | |
54 return client->write("foo", 3); | |
55 }).detach([](kj::Exception&& exception) { | |
56 KJ_FAIL_EXPECT(exception); | |
57 }); | |
58 | |
59 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { | |
60 listener = result->listen(); | |
61 port.fulfiller->fulfill(listener->getPort()); | |
62 return listener->accept(); | |
63 }).then([&](Own<AsyncIoStream>&& result) { | |
64 server = kj::mv(result); | |
65 return server->tryRead(receiveBuffer, 3, 4); | |
66 }).then([&](size_t n) { | |
67 EXPECT_EQ(3u, n); | |
68 return heapString(receiveBuffer, n); | |
69 }).wait(ioContext.waitScope); | |
70 | |
71 EXPECT_EQ("foo", result); | |
72 } | |
73 | |
74 String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) { | |
75 return network.parseAddress(text, portHint).wait(waitScope)->toString(); | |
76 } | |
77 | |
78 bool hasIpv6() { | |
79 // Can getaddrinfo() parse ipv6 addresses? This is only true if ipv6 is configured on at least | |
80 // one interface. (The loopback interface usually has it even if others don't... but not always.) | |
81 struct addrinfo* list; | |
82 int status = getaddrinfo("::", nullptr, nullptr, &list); | |
83 if (status == 0) { | |
84 freeaddrinfo(list); | |
85 return true; | |
86 } else { | |
87 return false; | |
88 } | |
89 } | |
90 | |
91 TEST(AsyncIo, AddressParsing) { | |
92 auto ioContext = setupAsyncIo(); | |
93 auto& w = ioContext.waitScope; | |
94 auto& network = ioContext.provider->getNetwork(); | |
95 | |
96 EXPECT_EQ("*:0", tryParse(w, network, "*")); | |
97 EXPECT_EQ("*:123", tryParse(w, network, "*:123")); | |
98 EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0")); | |
99 EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678)); | |
100 | |
101 #if !_WIN32 | |
102 EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz")); | |
103 #endif | |
104 | |
105 // We can parse services by name... | |
106 #if !__ANDROID__ // Service names not supported on Android for some reason? | |
107 EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678)); | |
108 EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678)); | |
109 #endif | |
110 | |
111 // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any | |
112 // interfaces. | |
113 if (hasIpv6()) { | |
114 EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123)); | |
115 EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432)); | |
116 #if !__ANDROID__ // Service names not supported on Android for some reason? | |
117 EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678)); | |
118 EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678)); | |
119 #endif | |
120 } | |
121 | |
122 // It would be nice to test DNS lookup here but the test would not be very hermetic. Even | |
123 // localhost can map to different addresses depending on whether IPv6 is enabled. We do | |
124 // connect to "localhost" in a different test, though. | |
125 } | |
126 | |
127 TEST(AsyncIo, OneWayPipe) { | |
128 auto ioContext = setupAsyncIo(); | |
129 | |
130 auto pipe = ioContext.provider->newOneWayPipe(); | |
131 char receiveBuffer[4]; | |
132 | |
133 pipe.out->write("foo", 3).detach([](kj::Exception&& exception) { | |
134 KJ_FAIL_EXPECT(exception); | |
135 }); | |
136 | |
137 kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) { | |
138 EXPECT_EQ(3u, n); | |
139 return heapString(receiveBuffer, n); | |
140 }).wait(ioContext.waitScope); | |
141 | |
142 EXPECT_EQ("foo", result); | |
143 } | |
144 | |
145 TEST(AsyncIo, TwoWayPipe) { | |
146 auto ioContext = setupAsyncIo(); | |
147 | |
148 auto pipe = ioContext.provider->newTwoWayPipe(); | |
149 char receiveBuffer1[4]; | |
150 char receiveBuffer2[4]; | |
151 | |
152 auto promise = pipe.ends[0]->write("foo", 3).then([&]() { | |
153 return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); | |
154 }).then([&](size_t n) { | |
155 EXPECT_EQ(3u, n); | |
156 return heapString(receiveBuffer1, n); | |
157 }); | |
158 | |
159 kj::String result = pipe.ends[1]->write("bar", 3).then([&]() { | |
160 return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4); | |
161 }).then([&](size_t n) { | |
162 EXPECT_EQ(3u, n); | |
163 return heapString(receiveBuffer2, n); | |
164 }).wait(ioContext.waitScope); | |
165 | |
166 kj::String result2 = promise.wait(ioContext.waitScope); | |
167 | |
168 EXPECT_EQ("foo", result); | |
169 EXPECT_EQ("bar", result2); | |
170 } | |
171 | |
172 TEST(AsyncIo, PipeThread) { | |
173 auto ioContext = setupAsyncIo(); | |
174 | |
175 auto pipeThread = ioContext.provider->newPipeThread( | |
176 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { | |
177 char buf[4]; | |
178 stream.write("foo", 3).wait(waitScope); | |
179 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); | |
180 EXPECT_EQ("bar", heapString(buf, 3)); | |
181 | |
182 // Expect disconnect. | |
183 EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope)); | |
184 }); | |
185 | |
186 char buf[4]; | |
187 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); | |
188 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); | |
189 EXPECT_EQ("foo", heapString(buf, 3)); | |
190 } | |
191 | |
192 TEST(AsyncIo, PipeThreadDisconnects) { | |
193 // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting. | |
194 | |
195 auto ioContext = setupAsyncIo(); | |
196 | |
197 auto pipeThread = ioContext.provider->newPipeThread( | |
198 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { | |
199 char buf[4]; | |
200 stream.write("foo", 3).wait(waitScope); | |
201 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); | |
202 EXPECT_EQ("bar", heapString(buf, 3)); | |
203 }); | |
204 | |
205 char buf[4]; | |
206 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); | |
207 EXPECT_EQ("foo", heapString(buf, 3)); | |
208 | |
209 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); | |
210 | |
211 // Expect disconnect. | |
212 EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope)); | |
213 } | |
214 | |
215 TEST(AsyncIo, Timeouts) { | |
216 auto ioContext = setupAsyncIo(); | |
217 | |
218 Timer& timer = ioContext.provider->getTimer(); | |
219 | |
220 auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE)); | |
221 auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123)); | |
222 | |
223 EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; }) | |
224 .wait(ioContext.waitScope)); | |
225 EXPECT_EQ(123, promise2.wait(ioContext.waitScope)); | |
226 } | |
227 | |
228 #if !_WIN32 // datagrams not implemented on win32 yet | |
229 | |
230 TEST(AsyncIo, Udp) { | |
231 auto ioContext = setupAsyncIo(); | |
232 | |
233 auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope); | |
234 | |
235 auto port1 = addr->bindDatagramPort(); | |
236 auto port2 = addr->bindDatagramPort(); | |
237 | |
238 auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort()) | |
239 .wait(ioContext.waitScope); | |
240 auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort()) | |
241 .wait(ioContext.waitScope); | |
242 | |
243 Own<NetworkAddress> receivedAddr; | |
244 | |
245 { | |
246 // Send a message and receive it. | |
247 EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope)); | |
248 auto receiver = port2->makeReceiver(); | |
249 | |
250 receiver->receive().wait(ioContext.waitScope); | |
251 { | |
252 auto content = receiver->getContent(); | |
253 EXPECT_EQ("foo", kj::heapString(content.value.asChars())); | |
254 EXPECT_FALSE(content.isTruncated); | |
255 } | |
256 receivedAddr = receiver->getSource().clone(); | |
257 EXPECT_EQ(addr1->toString(), receivedAddr->toString()); | |
258 { | |
259 auto ancillary = receiver->getAncillary(); | |
260 EXPECT_EQ(0, ancillary.value.size()); | |
261 EXPECT_FALSE(ancillary.isTruncated); | |
262 } | |
263 | |
264 // Receive a second message with the same receiver. | |
265 { | |
266 auto promise = receiver->receive(); // This time, start receiving before sending | |
267 EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope)); | |
268 promise.wait(ioContext.waitScope); | |
269 auto content = receiver->getContent(); | |
270 EXPECT_EQ("barbaz", kj::heapString(content.value.asChars())); | |
271 EXPECT_FALSE(content.isTruncated); | |
272 } | |
273 } | |
274 | |
275 DatagramReceiver::Capacity capacity; | |
276 capacity.content = 8; | |
277 capacity.ancillary = 1024; | |
278 | |
279 { | |
280 // Send a reply that will be truncated. | |
281 EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope)); | |
282 auto recv1 = port1->makeReceiver(capacity); | |
283 | |
284 recv1->receive().wait(ioContext.waitScope); | |
285 { | |
286 auto content = recv1->getContent(); | |
287 EXPECT_EQ("01234567", kj::heapString(content.value.asChars())); | |
288 EXPECT_TRUE(content.isTruncated); | |
289 } | |
290 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
291 { | |
292 auto ancillary = recv1->getAncillary(); | |
293 EXPECT_EQ(0, ancillary.value.size()); | |
294 EXPECT_FALSE(ancillary.isTruncated); | |
295 } | |
296 | |
297 #if defined(IP_PKTINFO) && !__CYGWIN__ | |
298 // Set IP_PKTINFO header and try to receive it. | |
299 // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html | |
300 // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7. | |
301 int one = 1; | |
302 port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one)); | |
303 | |
304 EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope)); | |
305 | |
306 recv1->receive().wait(ioContext.waitScope); | |
307 { | |
308 auto content = recv1->getContent(); | |
309 EXPECT_EQ("foo", kj::heapString(content.value.asChars())); | |
310 EXPECT_FALSE(content.isTruncated); | |
311 } | |
312 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
313 { | |
314 auto ancillary = recv1->getAncillary(); | |
315 EXPECT_FALSE(ancillary.isTruncated); | |
316 ASSERT_EQ(1, ancillary.value.size()); | |
317 | |
318 auto message = ancillary.value[0]; | |
319 EXPECT_EQ(IPPROTO_IP, message.getLevel()); | |
320 EXPECT_EQ(IP_PKTINFO, message.getType()); | |
321 EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size()); | |
322 auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>()); | |
323 EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1 | |
324 } | |
325 | |
326 // See what happens if there's not quite enough space for in_pktinfo. | |
327 capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8; | |
328 recv1 = port1->makeReceiver(capacity); | |
329 | |
330 EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope)); | |
331 | |
332 recv1->receive().wait(ioContext.waitScope); | |
333 { | |
334 auto content = recv1->getContent(); | |
335 EXPECT_EQ("bar", kj::heapString(content.value.asChars())); | |
336 EXPECT_FALSE(content.isTruncated); | |
337 } | |
338 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
339 { | |
340 auto ancillary = recv1->getAncillary(); | |
341 EXPECT_TRUE(ancillary.isTruncated); | |
342 | |
343 // We might get a message, but it will be truncated. | |
344 if (ancillary.value.size() != 0) { | |
345 EXPECT_EQ(1, ancillary.value.size()); | |
346 | |
347 auto message = ancillary.value[0]; | |
348 EXPECT_EQ(IPPROTO_IP, message.getLevel()); | |
349 EXPECT_EQ(IP_PKTINFO, message.getType()); | |
350 | |
351 EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr); | |
352 EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo)); | |
353 } | |
354 } | |
355 | |
356 // See what happens if there's not enough space even for the cmsghdr. | |
357 capacity.ancillary = CMSG_SPACE(0) - 8; | |
358 recv1 = port1->makeReceiver(capacity); | |
359 | |
360 EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope)); | |
361 | |
362 recv1->receive().wait(ioContext.waitScope); | |
363 { | |
364 auto content = recv1->getContent(); | |
365 EXPECT_EQ("baz", kj::heapString(content.value.asChars())); | |
366 EXPECT_FALSE(content.isTruncated); | |
367 } | |
368 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
369 { | |
370 auto ancillary = recv1->getAncillary(); | |
371 EXPECT_TRUE(ancillary.isTruncated); | |
372 EXPECT_EQ(0, ancillary.value.size()); | |
373 } | |
374 #endif | |
375 } | |
376 } | |
377 | |
378 #endif // !_WIN32 | |
379 | |
380 } // namespace | |
381 } // namespace kj |