comparison src/capnproto-0.6.0/c++/src/kj/async-io-test.c++ @ 62:0994c39f1e94

Cap'n Proto v0.6 + build for OSX
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 22 May 2017 10:01:37 +0100
parents
children
comparison
equal deleted inserted replaced
61:d101c4099725 62:0994c39f1e94
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #include "async-io.h"
23 #include "debug.h"
24 #include <kj/compat/gtest.h>
25 #include <sys/types.h>
26 #if _WIN32
27 #include <ws2tcpip.h>
28 #include "windows-sanity.h"
29 #else
30 #include <netdb.h>
31 #endif
32
33 namespace kj {
34 namespace {
35
36 TEST(AsyncIo, SimpleNetwork) {
37 auto ioContext = setupAsyncIo();
38 auto& network = ioContext.provider->getNetwork();
39
40 Own<ConnectionReceiver> listener;
41 Own<AsyncIoStream> server;
42 Own<AsyncIoStream> client;
43
44 char receiveBuffer[4];
45
46 auto port = newPromiseAndFulfiller<uint>();
47
48 port.promise.then([&](uint portnum) {
49 return network.parseAddress("localhost", portnum);
50 }).then([&](Own<NetworkAddress>&& result) {
51 return result->connect();
52 }).then([&](Own<AsyncIoStream>&& result) {
53 client = kj::mv(result);
54 return client->write("foo", 3);
55 }).detach([](kj::Exception&& exception) {
56 KJ_FAIL_EXPECT(exception);
57 });
58
59 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) {
60 listener = result->listen();
61 port.fulfiller->fulfill(listener->getPort());
62 return listener->accept();
63 }).then([&](Own<AsyncIoStream>&& result) {
64 server = kj::mv(result);
65 return server->tryRead(receiveBuffer, 3, 4);
66 }).then([&](size_t n) {
67 EXPECT_EQ(3u, n);
68 return heapString(receiveBuffer, n);
69 }).wait(ioContext.waitScope);
70
71 EXPECT_EQ("foo", result);
72 }
73
74 String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) {
75 return network.parseAddress(text, portHint).wait(waitScope)->toString();
76 }
77
78 bool hasIpv6() {
79 // Can getaddrinfo() parse ipv6 addresses? This is only true if ipv6 is configured on at least
80 // one interface. (The loopback interface usually has it even if others don't... but not always.)
81 struct addrinfo* list;
82 int status = getaddrinfo("::", nullptr, nullptr, &list);
83 if (status == 0) {
84 freeaddrinfo(list);
85 return true;
86 } else {
87 return false;
88 }
89 }
90
91 TEST(AsyncIo, AddressParsing) {
92 auto ioContext = setupAsyncIo();
93 auto& w = ioContext.waitScope;
94 auto& network = ioContext.provider->getNetwork();
95
96 EXPECT_EQ("*:0", tryParse(w, network, "*"));
97 EXPECT_EQ("*:123", tryParse(w, network, "*:123"));
98 EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0"));
99 EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678));
100
101 #if !_WIN32
102 EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz"));
103 #endif
104
105 // We can parse services by name...
106 #if !__ANDROID__ // Service names not supported on Android for some reason?
107 EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678));
108 EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678));
109 #endif
110
111 // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any
112 // interfaces.
113 if (hasIpv6()) {
114 EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123));
115 EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432));
116 #if !__ANDROID__ // Service names not supported on Android for some reason?
117 EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678));
118 EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678));
119 #endif
120 }
121
122 // It would be nice to test DNS lookup here but the test would not be very hermetic. Even
123 // localhost can map to different addresses depending on whether IPv6 is enabled. We do
124 // connect to "localhost" in a different test, though.
125 }
126
127 TEST(AsyncIo, OneWayPipe) {
128 auto ioContext = setupAsyncIo();
129
130 auto pipe = ioContext.provider->newOneWayPipe();
131 char receiveBuffer[4];
132
133 pipe.out->write("foo", 3).detach([](kj::Exception&& exception) {
134 KJ_FAIL_EXPECT(exception);
135 });
136
137 kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) {
138 EXPECT_EQ(3u, n);
139 return heapString(receiveBuffer, n);
140 }).wait(ioContext.waitScope);
141
142 EXPECT_EQ("foo", result);
143 }
144
145 TEST(AsyncIo, TwoWayPipe) {
146 auto ioContext = setupAsyncIo();
147
148 auto pipe = ioContext.provider->newTwoWayPipe();
149 char receiveBuffer1[4];
150 char receiveBuffer2[4];
151
152 auto promise = pipe.ends[0]->write("foo", 3).then([&]() {
153 return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
154 }).then([&](size_t n) {
155 EXPECT_EQ(3u, n);
156 return heapString(receiveBuffer1, n);
157 });
158
159 kj::String result = pipe.ends[1]->write("bar", 3).then([&]() {
160 return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4);
161 }).then([&](size_t n) {
162 EXPECT_EQ(3u, n);
163 return heapString(receiveBuffer2, n);
164 }).wait(ioContext.waitScope);
165
166 kj::String result2 = promise.wait(ioContext.waitScope);
167
168 EXPECT_EQ("foo", result);
169 EXPECT_EQ("bar", result2);
170 }
171
172 TEST(AsyncIo, PipeThread) {
173 auto ioContext = setupAsyncIo();
174
175 auto pipeThread = ioContext.provider->newPipeThread(
176 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) {
177 char buf[4];
178 stream.write("foo", 3).wait(waitScope);
179 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope));
180 EXPECT_EQ("bar", heapString(buf, 3));
181
182 // Expect disconnect.
183 EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope));
184 });
185
186 char buf[4];
187 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope);
188 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope));
189 EXPECT_EQ("foo", heapString(buf, 3));
190 }
191
192 TEST(AsyncIo, PipeThreadDisconnects) {
193 // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting.
194
195 auto ioContext = setupAsyncIo();
196
197 auto pipeThread = ioContext.provider->newPipeThread(
198 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) {
199 char buf[4];
200 stream.write("foo", 3).wait(waitScope);
201 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope));
202 EXPECT_EQ("bar", heapString(buf, 3));
203 });
204
205 char buf[4];
206 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope));
207 EXPECT_EQ("foo", heapString(buf, 3));
208
209 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope);
210
211 // Expect disconnect.
212 EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope));
213 }
214
215 TEST(AsyncIo, Timeouts) {
216 auto ioContext = setupAsyncIo();
217
218 Timer& timer = ioContext.provider->getTimer();
219
220 auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE));
221 auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123));
222
223 EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; })
224 .wait(ioContext.waitScope));
225 EXPECT_EQ(123, promise2.wait(ioContext.waitScope));
226 }
227
228 #if !_WIN32 // datagrams not implemented on win32 yet
229
230 TEST(AsyncIo, Udp) {
231 auto ioContext = setupAsyncIo();
232
233 auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope);
234
235 auto port1 = addr->bindDatagramPort();
236 auto port2 = addr->bindDatagramPort();
237
238 auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort())
239 .wait(ioContext.waitScope);
240 auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort())
241 .wait(ioContext.waitScope);
242
243 Own<NetworkAddress> receivedAddr;
244
245 {
246 // Send a message and receive it.
247 EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope));
248 auto receiver = port2->makeReceiver();
249
250 receiver->receive().wait(ioContext.waitScope);
251 {
252 auto content = receiver->getContent();
253 EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
254 EXPECT_FALSE(content.isTruncated);
255 }
256 receivedAddr = receiver->getSource().clone();
257 EXPECT_EQ(addr1->toString(), receivedAddr->toString());
258 {
259 auto ancillary = receiver->getAncillary();
260 EXPECT_EQ(0, ancillary.value.size());
261 EXPECT_FALSE(ancillary.isTruncated);
262 }
263
264 // Receive a second message with the same receiver.
265 {
266 auto promise = receiver->receive(); // This time, start receiving before sending
267 EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope));
268 promise.wait(ioContext.waitScope);
269 auto content = receiver->getContent();
270 EXPECT_EQ("barbaz", kj::heapString(content.value.asChars()));
271 EXPECT_FALSE(content.isTruncated);
272 }
273 }
274
275 DatagramReceiver::Capacity capacity;
276 capacity.content = 8;
277 capacity.ancillary = 1024;
278
279 {
280 // Send a reply that will be truncated.
281 EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope));
282 auto recv1 = port1->makeReceiver(capacity);
283
284 recv1->receive().wait(ioContext.waitScope);
285 {
286 auto content = recv1->getContent();
287 EXPECT_EQ("01234567", kj::heapString(content.value.asChars()));
288 EXPECT_TRUE(content.isTruncated);
289 }
290 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
291 {
292 auto ancillary = recv1->getAncillary();
293 EXPECT_EQ(0, ancillary.value.size());
294 EXPECT_FALSE(ancillary.isTruncated);
295 }
296
297 #if defined(IP_PKTINFO) && !__CYGWIN__
298 // Set IP_PKTINFO header and try to receive it.
299 // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html
300 // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7.
301 int one = 1;
302 port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one));
303
304 EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope));
305
306 recv1->receive().wait(ioContext.waitScope);
307 {
308 auto content = recv1->getContent();
309 EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
310 EXPECT_FALSE(content.isTruncated);
311 }
312 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
313 {
314 auto ancillary = recv1->getAncillary();
315 EXPECT_FALSE(ancillary.isTruncated);
316 ASSERT_EQ(1, ancillary.value.size());
317
318 auto message = ancillary.value[0];
319 EXPECT_EQ(IPPROTO_IP, message.getLevel());
320 EXPECT_EQ(IP_PKTINFO, message.getType());
321 EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size());
322 auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>());
323 EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1
324 }
325
326 // See what happens if there's not quite enough space for in_pktinfo.
327 capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8;
328 recv1 = port1->makeReceiver(capacity);
329
330 EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope));
331
332 recv1->receive().wait(ioContext.waitScope);
333 {
334 auto content = recv1->getContent();
335 EXPECT_EQ("bar", kj::heapString(content.value.asChars()));
336 EXPECT_FALSE(content.isTruncated);
337 }
338 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
339 {
340 auto ancillary = recv1->getAncillary();
341 EXPECT_TRUE(ancillary.isTruncated);
342
343 // We might get a message, but it will be truncated.
344 if (ancillary.value.size() != 0) {
345 EXPECT_EQ(1, ancillary.value.size());
346
347 auto message = ancillary.value[0];
348 EXPECT_EQ(IPPROTO_IP, message.getLevel());
349 EXPECT_EQ(IP_PKTINFO, message.getType());
350
351 EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr);
352 EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo));
353 }
354 }
355
356 // See what happens if there's not enough space even for the cmsghdr.
357 capacity.ancillary = CMSG_SPACE(0) - 8;
358 recv1 = port1->makeReceiver(capacity);
359
360 EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope));
361
362 recv1->receive().wait(ioContext.waitScope);
363 {
364 auto content = recv1->getContent();
365 EXPECT_EQ("baz", kj::heapString(content.value.asChars()));
366 EXPECT_FALSE(content.isTruncated);
367 }
368 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
369 {
370 auto ancillary = recv1->getAncillary();
371 EXPECT_TRUE(ancillary.isTruncated);
372 EXPECT_EQ(0, ancillary.value.size());
373 }
374 #endif
375 }
376 }
377
378 #endif // !_WIN32
379
380 } // namespace
381 } // namespace kj