annotate src/capnproto-0.6.0/c++/src/kj/async-io-test.c++ @ 62:0994c39f1e94

Cap'n Proto v0.6 + build for OSX
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 22 May 2017 10:01:37 +0100
parents
children
rev   line source
cannam@62 1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
cannam@62 2 // Licensed under the MIT License:
cannam@62 3 //
cannam@62 4 // Permission is hereby granted, free of charge, to any person obtaining a copy
cannam@62 5 // of this software and associated documentation files (the "Software"), to deal
cannam@62 6 // in the Software without restriction, including without limitation the rights
cannam@62 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
cannam@62 8 // copies of the Software, and to permit persons to whom the Software is
cannam@62 9 // furnished to do so, subject to the following conditions:
cannam@62 10 //
cannam@62 11 // The above copyright notice and this permission notice shall be included in
cannam@62 12 // all copies or substantial portions of the Software.
cannam@62 13 //
cannam@62 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
cannam@62 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
cannam@62 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
cannam@62 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
cannam@62 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
cannam@62 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
cannam@62 20 // THE SOFTWARE.
cannam@62 21
cannam@62 22 #include "async-io.h"
cannam@62 23 #include "debug.h"
cannam@62 24 #include <kj/compat/gtest.h>
cannam@62 25 #include <sys/types.h>
cannam@62 26 #if _WIN32
cannam@62 27 #include <ws2tcpip.h>
cannam@62 28 #include "windows-sanity.h"
cannam@62 29 #else
cannam@62 30 #include <netdb.h>
cannam@62 31 #endif
cannam@62 32
cannam@62 33 namespace kj {
cannam@62 34 namespace {
cannam@62 35
cannam@62 36 TEST(AsyncIo, SimpleNetwork) {
cannam@62 37 auto ioContext = setupAsyncIo();
cannam@62 38 auto& network = ioContext.provider->getNetwork();
cannam@62 39
cannam@62 40 Own<ConnectionReceiver> listener;
cannam@62 41 Own<AsyncIoStream> server;
cannam@62 42 Own<AsyncIoStream> client;
cannam@62 43
cannam@62 44 char receiveBuffer[4];
cannam@62 45
cannam@62 46 auto port = newPromiseAndFulfiller<uint>();
cannam@62 47
cannam@62 48 port.promise.then([&](uint portnum) {
cannam@62 49 return network.parseAddress("localhost", portnum);
cannam@62 50 }).then([&](Own<NetworkAddress>&& result) {
cannam@62 51 return result->connect();
cannam@62 52 }).then([&](Own<AsyncIoStream>&& result) {
cannam@62 53 client = kj::mv(result);
cannam@62 54 return client->write("foo", 3);
cannam@62 55 }).detach([](kj::Exception&& exception) {
cannam@62 56 KJ_FAIL_EXPECT(exception);
cannam@62 57 });
cannam@62 58
cannam@62 59 kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) {
cannam@62 60 listener = result->listen();
cannam@62 61 port.fulfiller->fulfill(listener->getPort());
cannam@62 62 return listener->accept();
cannam@62 63 }).then([&](Own<AsyncIoStream>&& result) {
cannam@62 64 server = kj::mv(result);
cannam@62 65 return server->tryRead(receiveBuffer, 3, 4);
cannam@62 66 }).then([&](size_t n) {
cannam@62 67 EXPECT_EQ(3u, n);
cannam@62 68 return heapString(receiveBuffer, n);
cannam@62 69 }).wait(ioContext.waitScope);
cannam@62 70
cannam@62 71 EXPECT_EQ("foo", result);
cannam@62 72 }
cannam@62 73
cannam@62 74 String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) {
cannam@62 75 return network.parseAddress(text, portHint).wait(waitScope)->toString();
cannam@62 76 }
cannam@62 77
cannam@62 78 bool hasIpv6() {
cannam@62 79 // Can getaddrinfo() parse ipv6 addresses? This is only true if ipv6 is configured on at least
cannam@62 80 // one interface. (The loopback interface usually has it even if others don't... but not always.)
cannam@62 81 struct addrinfo* list;
cannam@62 82 int status = getaddrinfo("::", nullptr, nullptr, &list);
cannam@62 83 if (status == 0) {
cannam@62 84 freeaddrinfo(list);
cannam@62 85 return true;
cannam@62 86 } else {
cannam@62 87 return false;
cannam@62 88 }
cannam@62 89 }
cannam@62 90
cannam@62 91 TEST(AsyncIo, AddressParsing) {
cannam@62 92 auto ioContext = setupAsyncIo();
cannam@62 93 auto& w = ioContext.waitScope;
cannam@62 94 auto& network = ioContext.provider->getNetwork();
cannam@62 95
cannam@62 96 EXPECT_EQ("*:0", tryParse(w, network, "*"));
cannam@62 97 EXPECT_EQ("*:123", tryParse(w, network, "*:123"));
cannam@62 98 EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0"));
cannam@62 99 EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678));
cannam@62 100
cannam@62 101 #if !_WIN32
cannam@62 102 EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz"));
cannam@62 103 #endif
cannam@62 104
cannam@62 105 // We can parse services by name...
cannam@62 106 #if !__ANDROID__ // Service names not supported on Android for some reason?
cannam@62 107 EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678));
cannam@62 108 EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678));
cannam@62 109 #endif
cannam@62 110
cannam@62 111 // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any
cannam@62 112 // interfaces.
cannam@62 113 if (hasIpv6()) {
cannam@62 114 EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123));
cannam@62 115 EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432));
cannam@62 116 #if !__ANDROID__ // Service names not supported on Android for some reason?
cannam@62 117 EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678));
cannam@62 118 EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678));
cannam@62 119 #endif
cannam@62 120 }
cannam@62 121
cannam@62 122 // It would be nice to test DNS lookup here but the test would not be very hermetic. Even
cannam@62 123 // localhost can map to different addresses depending on whether IPv6 is enabled. We do
cannam@62 124 // connect to "localhost" in a different test, though.
cannam@62 125 }
cannam@62 126
cannam@62 127 TEST(AsyncIo, OneWayPipe) {
cannam@62 128 auto ioContext = setupAsyncIo();
cannam@62 129
cannam@62 130 auto pipe = ioContext.provider->newOneWayPipe();
cannam@62 131 char receiveBuffer[4];
cannam@62 132
cannam@62 133 pipe.out->write("foo", 3).detach([](kj::Exception&& exception) {
cannam@62 134 KJ_FAIL_EXPECT(exception);
cannam@62 135 });
cannam@62 136
cannam@62 137 kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) {
cannam@62 138 EXPECT_EQ(3u, n);
cannam@62 139 return heapString(receiveBuffer, n);
cannam@62 140 }).wait(ioContext.waitScope);
cannam@62 141
cannam@62 142 EXPECT_EQ("foo", result);
cannam@62 143 }
cannam@62 144
cannam@62 145 TEST(AsyncIo, TwoWayPipe) {
cannam@62 146 auto ioContext = setupAsyncIo();
cannam@62 147
cannam@62 148 auto pipe = ioContext.provider->newTwoWayPipe();
cannam@62 149 char receiveBuffer1[4];
cannam@62 150 char receiveBuffer2[4];
cannam@62 151
cannam@62 152 auto promise = pipe.ends[0]->write("foo", 3).then([&]() {
cannam@62 153 return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
cannam@62 154 }).then([&](size_t n) {
cannam@62 155 EXPECT_EQ(3u, n);
cannam@62 156 return heapString(receiveBuffer1, n);
cannam@62 157 });
cannam@62 158
cannam@62 159 kj::String result = pipe.ends[1]->write("bar", 3).then([&]() {
cannam@62 160 return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4);
cannam@62 161 }).then([&](size_t n) {
cannam@62 162 EXPECT_EQ(3u, n);
cannam@62 163 return heapString(receiveBuffer2, n);
cannam@62 164 }).wait(ioContext.waitScope);
cannam@62 165
cannam@62 166 kj::String result2 = promise.wait(ioContext.waitScope);
cannam@62 167
cannam@62 168 EXPECT_EQ("foo", result);
cannam@62 169 EXPECT_EQ("bar", result2);
cannam@62 170 }
cannam@62 171
cannam@62 172 TEST(AsyncIo, PipeThread) {
cannam@62 173 auto ioContext = setupAsyncIo();
cannam@62 174
cannam@62 175 auto pipeThread = ioContext.provider->newPipeThread(
cannam@62 176 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) {
cannam@62 177 char buf[4];
cannam@62 178 stream.write("foo", 3).wait(waitScope);
cannam@62 179 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope));
cannam@62 180 EXPECT_EQ("bar", heapString(buf, 3));
cannam@62 181
cannam@62 182 // Expect disconnect.
cannam@62 183 EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope));
cannam@62 184 });
cannam@62 185
cannam@62 186 char buf[4];
cannam@62 187 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope);
cannam@62 188 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope));
cannam@62 189 EXPECT_EQ("foo", heapString(buf, 3));
cannam@62 190 }
cannam@62 191
cannam@62 192 TEST(AsyncIo, PipeThreadDisconnects) {
cannam@62 193 // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting.
cannam@62 194
cannam@62 195 auto ioContext = setupAsyncIo();
cannam@62 196
cannam@62 197 auto pipeThread = ioContext.provider->newPipeThread(
cannam@62 198 [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) {
cannam@62 199 char buf[4];
cannam@62 200 stream.write("foo", 3).wait(waitScope);
cannam@62 201 EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope));
cannam@62 202 EXPECT_EQ("bar", heapString(buf, 3));
cannam@62 203 });
cannam@62 204
cannam@62 205 char buf[4];
cannam@62 206 EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope));
cannam@62 207 EXPECT_EQ("foo", heapString(buf, 3));
cannam@62 208
cannam@62 209 pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope);
cannam@62 210
cannam@62 211 // Expect disconnect.
cannam@62 212 EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope));
cannam@62 213 }
cannam@62 214
cannam@62 215 TEST(AsyncIo, Timeouts) {
cannam@62 216 auto ioContext = setupAsyncIo();
cannam@62 217
cannam@62 218 Timer& timer = ioContext.provider->getTimer();
cannam@62 219
cannam@62 220 auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE));
cannam@62 221 auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123));
cannam@62 222
cannam@62 223 EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; })
cannam@62 224 .wait(ioContext.waitScope));
cannam@62 225 EXPECT_EQ(123, promise2.wait(ioContext.waitScope));
cannam@62 226 }
cannam@62 227
cannam@62 228 #if !_WIN32 // datagrams not implemented on win32 yet
cannam@62 229
cannam@62 230 TEST(AsyncIo, Udp) {
cannam@62 231 auto ioContext = setupAsyncIo();
cannam@62 232
cannam@62 233 auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope);
cannam@62 234
cannam@62 235 auto port1 = addr->bindDatagramPort();
cannam@62 236 auto port2 = addr->bindDatagramPort();
cannam@62 237
cannam@62 238 auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort())
cannam@62 239 .wait(ioContext.waitScope);
cannam@62 240 auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort())
cannam@62 241 .wait(ioContext.waitScope);
cannam@62 242
cannam@62 243 Own<NetworkAddress> receivedAddr;
cannam@62 244
cannam@62 245 {
cannam@62 246 // Send a message and receive it.
cannam@62 247 EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope));
cannam@62 248 auto receiver = port2->makeReceiver();
cannam@62 249
cannam@62 250 receiver->receive().wait(ioContext.waitScope);
cannam@62 251 {
cannam@62 252 auto content = receiver->getContent();
cannam@62 253 EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
cannam@62 254 EXPECT_FALSE(content.isTruncated);
cannam@62 255 }
cannam@62 256 receivedAddr = receiver->getSource().clone();
cannam@62 257 EXPECT_EQ(addr1->toString(), receivedAddr->toString());
cannam@62 258 {
cannam@62 259 auto ancillary = receiver->getAncillary();
cannam@62 260 EXPECT_EQ(0, ancillary.value.size());
cannam@62 261 EXPECT_FALSE(ancillary.isTruncated);
cannam@62 262 }
cannam@62 263
cannam@62 264 // Receive a second message with the same receiver.
cannam@62 265 {
cannam@62 266 auto promise = receiver->receive(); // This time, start receiving before sending
cannam@62 267 EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope));
cannam@62 268 promise.wait(ioContext.waitScope);
cannam@62 269 auto content = receiver->getContent();
cannam@62 270 EXPECT_EQ("barbaz", kj::heapString(content.value.asChars()));
cannam@62 271 EXPECT_FALSE(content.isTruncated);
cannam@62 272 }
cannam@62 273 }
cannam@62 274
cannam@62 275 DatagramReceiver::Capacity capacity;
cannam@62 276 capacity.content = 8;
cannam@62 277 capacity.ancillary = 1024;
cannam@62 278
cannam@62 279 {
cannam@62 280 // Send a reply that will be truncated.
cannam@62 281 EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope));
cannam@62 282 auto recv1 = port1->makeReceiver(capacity);
cannam@62 283
cannam@62 284 recv1->receive().wait(ioContext.waitScope);
cannam@62 285 {
cannam@62 286 auto content = recv1->getContent();
cannam@62 287 EXPECT_EQ("01234567", kj::heapString(content.value.asChars()));
cannam@62 288 EXPECT_TRUE(content.isTruncated);
cannam@62 289 }
cannam@62 290 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
cannam@62 291 {
cannam@62 292 auto ancillary = recv1->getAncillary();
cannam@62 293 EXPECT_EQ(0, ancillary.value.size());
cannam@62 294 EXPECT_FALSE(ancillary.isTruncated);
cannam@62 295 }
cannam@62 296
cannam@62 297 #if defined(IP_PKTINFO) && !__CYGWIN__
cannam@62 298 // Set IP_PKTINFO header and try to receive it.
cannam@62 299 // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html
cannam@62 300 // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7.
cannam@62 301 int one = 1;
cannam@62 302 port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one));
cannam@62 303
cannam@62 304 EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope));
cannam@62 305
cannam@62 306 recv1->receive().wait(ioContext.waitScope);
cannam@62 307 {
cannam@62 308 auto content = recv1->getContent();
cannam@62 309 EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
cannam@62 310 EXPECT_FALSE(content.isTruncated);
cannam@62 311 }
cannam@62 312 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
cannam@62 313 {
cannam@62 314 auto ancillary = recv1->getAncillary();
cannam@62 315 EXPECT_FALSE(ancillary.isTruncated);
cannam@62 316 ASSERT_EQ(1, ancillary.value.size());
cannam@62 317
cannam@62 318 auto message = ancillary.value[0];
cannam@62 319 EXPECT_EQ(IPPROTO_IP, message.getLevel());
cannam@62 320 EXPECT_EQ(IP_PKTINFO, message.getType());
cannam@62 321 EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size());
cannam@62 322 auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>());
cannam@62 323 EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1
cannam@62 324 }
cannam@62 325
cannam@62 326 // See what happens if there's not quite enough space for in_pktinfo.
cannam@62 327 capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8;
cannam@62 328 recv1 = port1->makeReceiver(capacity);
cannam@62 329
cannam@62 330 EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope));
cannam@62 331
cannam@62 332 recv1->receive().wait(ioContext.waitScope);
cannam@62 333 {
cannam@62 334 auto content = recv1->getContent();
cannam@62 335 EXPECT_EQ("bar", kj::heapString(content.value.asChars()));
cannam@62 336 EXPECT_FALSE(content.isTruncated);
cannam@62 337 }
cannam@62 338 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
cannam@62 339 {
cannam@62 340 auto ancillary = recv1->getAncillary();
cannam@62 341 EXPECT_TRUE(ancillary.isTruncated);
cannam@62 342
cannam@62 343 // We might get a message, but it will be truncated.
cannam@62 344 if (ancillary.value.size() != 0) {
cannam@62 345 EXPECT_EQ(1, ancillary.value.size());
cannam@62 346
cannam@62 347 auto message = ancillary.value[0];
cannam@62 348 EXPECT_EQ(IPPROTO_IP, message.getLevel());
cannam@62 349 EXPECT_EQ(IP_PKTINFO, message.getType());
cannam@62 350
cannam@62 351 EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr);
cannam@62 352 EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo));
cannam@62 353 }
cannam@62 354 }
cannam@62 355
cannam@62 356 // See what happens if there's not enough space even for the cmsghdr.
cannam@62 357 capacity.ancillary = CMSG_SPACE(0) - 8;
cannam@62 358 recv1 = port1->makeReceiver(capacity);
cannam@62 359
cannam@62 360 EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope));
cannam@62 361
cannam@62 362 recv1->receive().wait(ioContext.waitScope);
cannam@62 363 {
cannam@62 364 auto content = recv1->getContent();
cannam@62 365 EXPECT_EQ("baz", kj::heapString(content.value.asChars()));
cannam@62 366 EXPECT_FALSE(content.isTruncated);
cannam@62 367 }
cannam@62 368 EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
cannam@62 369 {
cannam@62 370 auto ancillary = recv1->getAncillary();
cannam@62 371 EXPECT_TRUE(ancillary.isTruncated);
cannam@62 372 EXPECT_EQ(0, ancillary.value.size());
cannam@62 373 }
cannam@62 374 #endif
cannam@62 375 }
cannam@62 376 }
cannam@62 377
cannam@62 378 #endif // !_WIN32
cannam@62 379
cannam@62 380 } // namespace
cannam@62 381 } // namespace kj