Mercurial > hg > sv-dependency-builds
comparison src/capnproto-0.6.0/c++/src/kj/async-io-test.c++ @ 62:0994c39f1e94
Cap'n Proto v0.6 + build for OSX
| author | Chris Cannam <cannam@all-day-breakfast.com> |
|---|---|
| date | Mon, 22 May 2017 10:01:37 +0100 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 61:d101c4099725 | 62:0994c39f1e94 |
|---|---|
| 1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
| 2 // Licensed under the MIT License: | |
| 3 // | |
| 4 // Permission is hereby granted, free of charge, to any person obtaining a copy | |
| 5 // of this software and associated documentation files (the "Software"), to deal | |
| 6 // in the Software without restriction, including without limitation the rights | |
| 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| 8 // copies of the Software, and to permit persons to whom the Software is | |
| 9 // furnished to do so, subject to the following conditions: | |
| 10 // | |
| 11 // The above copyright notice and this permission notice shall be included in | |
| 12 // all copies or substantial portions of the Software. | |
| 13 // | |
| 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
| 20 // THE SOFTWARE. | |
| 21 | |
| 22 #include "async-io.h" | |
| 23 #include "debug.h" | |
| 24 #include <kj/compat/gtest.h> | |
| 25 #include <sys/types.h> | |
| 26 #if _WIN32 | |
| 27 #include <ws2tcpip.h> | |
| 28 #include "windows-sanity.h" | |
| 29 #else | |
| 30 #include <netdb.h> | |
| 31 #endif | |
| 32 | |
| 33 namespace kj { | |
| 34 namespace { | |
| 35 | |
| 36 TEST(AsyncIo, SimpleNetwork) { | |
| 37 auto ioContext = setupAsyncIo(); | |
| 38 auto& network = ioContext.provider->getNetwork(); | |
| 39 | |
| 40 Own<ConnectionReceiver> listener; | |
| 41 Own<AsyncIoStream> server; | |
| 42 Own<AsyncIoStream> client; | |
| 43 | |
| 44 char receiveBuffer[4]; | |
| 45 | |
| 46 auto port = newPromiseAndFulfiller<uint>(); | |
| 47 | |
| 48 port.promise.then([&](uint portnum) { | |
| 49 return network.parseAddress("localhost", portnum); | |
| 50 }).then([&](Own<NetworkAddress>&& result) { | |
| 51 return result->connect(); | |
| 52 }).then([&](Own<AsyncIoStream>&& result) { | |
| 53 client = kj::mv(result); | |
| 54 return client->write("foo", 3); | |
| 55 }).detach([](kj::Exception&& exception) { | |
| 56 KJ_FAIL_EXPECT(exception); | |
| 57 }); | |
| 58 | |
| 59 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { | |
| 60 listener = result->listen(); | |
| 61 port.fulfiller->fulfill(listener->getPort()); | |
| 62 return listener->accept(); | |
| 63 }).then([&](Own<AsyncIoStream>&& result) { | |
| 64 server = kj::mv(result); | |
| 65 return server->tryRead(receiveBuffer, 3, 4); | |
| 66 }).then([&](size_t n) { | |
| 67 EXPECT_EQ(3u, n); | |
| 68 return heapString(receiveBuffer, n); | |
| 69 }).wait(ioContext.waitScope); | |
| 70 | |
| 71 EXPECT_EQ("foo", result); | |
| 72 } | |
| 73 | |
| 74 String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) { | |
| 75 return network.parseAddress(text, portHint).wait(waitScope)->toString(); | |
| 76 } | |
| 77 | |
| 78 bool hasIpv6() { | |
| 79 // Can getaddrinfo() parse ipv6 addresses? This is only true if ipv6 is configured on at least | |
| 80 // one interface. (The loopback interface usually has it even if others don't... but not always.) | |
| 81 struct addrinfo* list; | |
| 82 int status = getaddrinfo("::", nullptr, nullptr, &list); | |
| 83 if (status == 0) { | |
| 84 freeaddrinfo(list); | |
| 85 return true; | |
| 86 } else { | |
| 87 return false; | |
| 88 } | |
| 89 } | |
| 90 | |
| 91 TEST(AsyncIo, AddressParsing) { | |
| 92 auto ioContext = setupAsyncIo(); | |
| 93 auto& w = ioContext.waitScope; | |
| 94 auto& network = ioContext.provider->getNetwork(); | |
| 95 | |
| 96 EXPECT_EQ("*:0", tryParse(w, network, "*")); | |
| 97 EXPECT_EQ("*:123", tryParse(w, network, "*:123")); | |
| 98 EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0")); | |
| 99 EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678)); | |
| 100 | |
| 101 #if !_WIN32 | |
| 102 EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz")); | |
| 103 #endif | |
| 104 | |
| 105 // We can parse services by name... | |
| 106 #if !__ANDROID__ // Service names not supported on Android for some reason? | |
| 107 EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678)); | |
| 108 EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678)); | |
| 109 #endif | |
| 110 | |
| 111 // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any | |
| 112 // interfaces. | |
| 113 if (hasIpv6()) { | |
| 114 EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123)); | |
| 115 EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432)); | |
| 116 #if !__ANDROID__ // Service names not supported on Android for some reason? | |
| 117 EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678)); | |
| 118 EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678)); | |
| 119 #endif | |
| 120 } | |
| 121 | |
| 122 // It would be nice to test DNS lookup here but the test would not be very hermetic. Even | |
| 123 // localhost can map to different addresses depending on whether IPv6 is enabled. We do | |
| 124 // connect to "localhost" in a different test, though. | |
| 125 } | |
| 126 | |
| 127 TEST(AsyncIo, OneWayPipe) { | |
| 128 auto ioContext = setupAsyncIo(); | |
| 129 | |
| 130 auto pipe = ioContext.provider->newOneWayPipe(); | |
| 131 char receiveBuffer[4]; | |
| 132 | |
| 133 pipe.out->write("foo", 3).detach([](kj::Exception&& exception) { | |
| 134 KJ_FAIL_EXPECT(exception); | |
| 135 }); | |
| 136 | |
| 137 kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) { | |
| 138 EXPECT_EQ(3u, n); | |
| 139 return heapString(receiveBuffer, n); | |
| 140 }).wait(ioContext.waitScope); | |
| 141 | |
| 142 EXPECT_EQ("foo", result); | |
| 143 } | |
| 144 | |
| 145 TEST(AsyncIo, TwoWayPipe) { | |
| 146 auto ioContext = setupAsyncIo(); | |
| 147 | |
| 148 auto pipe = ioContext.provider->newTwoWayPipe(); | |
| 149 char receiveBuffer1[4]; | |
| 150 char receiveBuffer2[4]; | |
| 151 | |
| 152 auto promise = pipe.ends[0]->write("foo", 3).then([&]() { | |
| 153 return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); | |
| 154 }).then([&](size_t n) { | |
| 155 EXPECT_EQ(3u, n); | |
| 156 return heapString(receiveBuffer1, n); | |
| 157 }); | |
| 158 | |
| 159 kj::String result = pipe.ends[1]->write("bar", 3).then([&]() { | |
| 160 return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4); | |
| 161 }).then([&](size_t n) { | |
| 162 EXPECT_EQ(3u, n); | |
| 163 return heapString(receiveBuffer2, n); | |
| 164 }).wait(ioContext.waitScope); | |
| 165 | |
| 166 kj::String result2 = promise.wait(ioContext.waitScope); | |
| 167 | |
| 168 EXPECT_EQ("foo", result); | |
| 169 EXPECT_EQ("bar", result2); | |
| 170 } | |
| 171 | |
| 172 TEST(AsyncIo, PipeThread) { | |
| 173 auto ioContext = setupAsyncIo(); | |
| 174 | |
| 175 auto pipeThread = ioContext.provider->newPipeThread( | |
| 176 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { | |
| 177 char buf[4]; | |
| 178 stream.write("foo", 3).wait(waitScope); | |
| 179 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); | |
| 180 EXPECT_EQ("bar", heapString(buf, 3)); | |
| 181 | |
| 182 // Expect disconnect. | |
| 183 EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope)); | |
| 184 }); | |
| 185 | |
| 186 char buf[4]; | |
| 187 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); | |
| 188 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); | |
| 189 EXPECT_EQ("foo", heapString(buf, 3)); | |
| 190 } | |
| 191 | |
| 192 TEST(AsyncIo, PipeThreadDisconnects) { | |
| 193 // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting. | |
| 194 | |
| 195 auto ioContext = setupAsyncIo(); | |
| 196 | |
| 197 auto pipeThread = ioContext.provider->newPipeThread( | |
| 198 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { | |
| 199 char buf[4]; | |
| 200 stream.write("foo", 3).wait(waitScope); | |
| 201 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); | |
| 202 EXPECT_EQ("bar", heapString(buf, 3)); | |
| 203 }); | |
| 204 | |
| 205 char buf[4]; | |
| 206 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); | |
| 207 EXPECT_EQ("foo", heapString(buf, 3)); | |
| 208 | |
| 209 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); | |
| 210 | |
| 211 // Expect disconnect. | |
| 212 EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope)); | |
| 213 } | |
| 214 | |
| 215 TEST(AsyncIo, Timeouts) { | |
| 216 auto ioContext = setupAsyncIo(); | |
| 217 | |
| 218 Timer& timer = ioContext.provider->getTimer(); | |
| 219 | |
| 220 auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE)); | |
| 221 auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123)); | |
| 222 | |
| 223 EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; }) | |
| 224 .wait(ioContext.waitScope)); | |
| 225 EXPECT_EQ(123, promise2.wait(ioContext.waitScope)); | |
| 226 } | |
| 227 | |
| 228 #if !_WIN32 // datagrams not implemented on win32 yet | |
| 229 | |
| 230 TEST(AsyncIo, Udp) { | |
| 231 auto ioContext = setupAsyncIo(); | |
| 232 | |
| 233 auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope); | |
| 234 | |
| 235 auto port1 = addr->bindDatagramPort(); | |
| 236 auto port2 = addr->bindDatagramPort(); | |
| 237 | |
| 238 auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort()) | |
| 239 .wait(ioContext.waitScope); | |
| 240 auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort()) | |
| 241 .wait(ioContext.waitScope); | |
| 242 | |
| 243 Own<NetworkAddress> receivedAddr; | |
| 244 | |
| 245 { | |
| 246 // Send a message and receive it. | |
| 247 EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope)); | |
| 248 auto receiver = port2->makeReceiver(); | |
| 249 | |
| 250 receiver->receive().wait(ioContext.waitScope); | |
| 251 { | |
| 252 auto content = receiver->getContent(); | |
| 253 EXPECT_EQ("foo", kj::heapString(content.value.asChars())); | |
| 254 EXPECT_FALSE(content.isTruncated); | |
| 255 } | |
| 256 receivedAddr = receiver->getSource().clone(); | |
| 257 EXPECT_EQ(addr1->toString(), receivedAddr->toString()); | |
| 258 { | |
| 259 auto ancillary = receiver->getAncillary(); | |
| 260 EXPECT_EQ(0, ancillary.value.size()); | |
| 261 EXPECT_FALSE(ancillary.isTruncated); | |
| 262 } | |
| 263 | |
| 264 // Receive a second message with the same receiver. | |
| 265 { | |
| 266 auto promise = receiver->receive(); // This time, start receiving before sending | |
| 267 EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope)); | |
| 268 promise.wait(ioContext.waitScope); | |
| 269 auto content = receiver->getContent(); | |
| 270 EXPECT_EQ("barbaz", kj::heapString(content.value.asChars())); | |
| 271 EXPECT_FALSE(content.isTruncated); | |
| 272 } | |
| 273 } | |
| 274 | |
| 275 DatagramReceiver::Capacity capacity; | |
| 276 capacity.content = 8; | |
| 277 capacity.ancillary = 1024; | |
| 278 | |
| 279 { | |
| 280 // Send a reply that will be truncated. | |
| 281 EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope)); | |
| 282 auto recv1 = port1->makeReceiver(capacity); | |
| 283 | |
| 284 recv1->receive().wait(ioContext.waitScope); | |
| 285 { | |
| 286 auto content = recv1->getContent(); | |
| 287 EXPECT_EQ("01234567", kj::heapString(content.value.asChars())); | |
| 288 EXPECT_TRUE(content.isTruncated); | |
| 289 } | |
| 290 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
| 291 { | |
| 292 auto ancillary = recv1->getAncillary(); | |
| 293 EXPECT_EQ(0, ancillary.value.size()); | |
| 294 EXPECT_FALSE(ancillary.isTruncated); | |
| 295 } | |
| 296 | |
| 297 #if defined(IP_PKTINFO) && !__CYGWIN__ | |
| 298 // Set IP_PKTINFO header and try to receive it. | |
| 299 // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html | |
| 300 // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7. | |
| 301 int one = 1; | |
| 302 port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one)); | |
| 303 | |
| 304 EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope)); | |
| 305 | |
| 306 recv1->receive().wait(ioContext.waitScope); | |
| 307 { | |
| 308 auto content = recv1->getContent(); | |
| 309 EXPECT_EQ("foo", kj::heapString(content.value.asChars())); | |
| 310 EXPECT_FALSE(content.isTruncated); | |
| 311 } | |
| 312 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
| 313 { | |
| 314 auto ancillary = recv1->getAncillary(); | |
| 315 EXPECT_FALSE(ancillary.isTruncated); | |
| 316 ASSERT_EQ(1, ancillary.value.size()); | |
| 317 | |
| 318 auto message = ancillary.value[0]; | |
| 319 EXPECT_EQ(IPPROTO_IP, message.getLevel()); | |
| 320 EXPECT_EQ(IP_PKTINFO, message.getType()); | |
| 321 EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size()); | |
| 322 auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>()); | |
| 323 EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1 | |
| 324 } | |
| 325 | |
| 326 // See what happens if there's not quite enough space for in_pktinfo. | |
| 327 capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8; | |
| 328 recv1 = port1->makeReceiver(capacity); | |
| 329 | |
| 330 EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope)); | |
| 331 | |
| 332 recv1->receive().wait(ioContext.waitScope); | |
| 333 { | |
| 334 auto content = recv1->getContent(); | |
| 335 EXPECT_EQ("bar", kj::heapString(content.value.asChars())); | |
| 336 EXPECT_FALSE(content.isTruncated); | |
| 337 } | |
| 338 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
| 339 { | |
| 340 auto ancillary = recv1->getAncillary(); | |
| 341 EXPECT_TRUE(ancillary.isTruncated); | |
| 342 | |
| 343 // We might get a message, but it will be truncated. | |
| 344 if (ancillary.value.size() != 0) { | |
| 345 EXPECT_EQ(1, ancillary.value.size()); | |
| 346 | |
| 347 auto message = ancillary.value[0]; | |
| 348 EXPECT_EQ(IPPROTO_IP, message.getLevel()); | |
| 349 EXPECT_EQ(IP_PKTINFO, message.getType()); | |
| 350 | |
| 351 EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr); | |
| 352 EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo)); | |
| 353 } | |
| 354 } | |
| 355 | |
| 356 // See what happens if there's not enough space even for the cmsghdr. | |
| 357 capacity.ancillary = CMSG_SPACE(0) - 8; | |
| 358 recv1 = port1->makeReceiver(capacity); | |
| 359 | |
| 360 EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope)); | |
| 361 | |
| 362 recv1->receive().wait(ioContext.waitScope); | |
| 363 { | |
| 364 auto content = recv1->getContent(); | |
| 365 EXPECT_EQ("baz", kj::heapString(content.value.asChars())); | |
| 366 EXPECT_FALSE(content.isTruncated); | |
| 367 } | |
| 368 EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); | |
| 369 { | |
| 370 auto ancillary = recv1->getAncillary(); | |
| 371 EXPECT_TRUE(ancillary.isTruncated); | |
| 372 EXPECT_EQ(0, ancillary.value.size()); | |
| 373 } | |
| 374 #endif | |
| 375 } | |
| 376 } | |
| 377 | |
| 378 #endif // !_WIN32 | |
| 379 | |
| 380 } // namespace | |
| 381 } // namespace kj |
