annotate src/capnproto-0.6.0/c++/src/kj/async-unix-test.c++ @ 169:223a55898ab9 tip default

Add null config files
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 02 Mar 2020 14:03:47 +0000
parents 45360b968bf4
children
rev   line source
cannam@147 1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
cannam@147 2 // Licensed under the MIT License:
cannam@147 3 //
cannam@147 4 // Permission is hereby granted, free of charge, to any person obtaining a copy
cannam@147 5 // of this software and associated documentation files (the "Software"), to deal
cannam@147 6 // in the Software without restriction, including without limitation the rights
cannam@147 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
cannam@147 8 // copies of the Software, and to permit persons to whom the Software is
cannam@147 9 // furnished to do so, subject to the following conditions:
cannam@147 10 //
cannam@147 11 // The above copyright notice and this permission notice shall be included in
cannam@147 12 // all copies or substantial portions of the Software.
cannam@147 13 //
cannam@147 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
cannam@147 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
cannam@147 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
cannam@147 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
cannam@147 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
cannam@147 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
cannam@147 20 // THE SOFTWARE.
cannam@147 21
cannam@147 22 #if !_WIN32
cannam@147 23
cannam@147 24 #include "async-unix.h"
cannam@147 25 #include "thread.h"
cannam@147 26 #include "debug.h"
cannam@147 27 #include "io.h"
cannam@147 28 #include <unistd.h>
cannam@147 29 #include <fcntl.h>
cannam@147 30 #include <sys/types.h>
cannam@147 31 #include <sys/socket.h>
cannam@147 32 #include <sys/stat.h>
cannam@147 33 #include <netinet/in.h>
cannam@147 34 #include <kj/compat/gtest.h>
cannam@147 35 #include <pthread.h>
cannam@147 36 #include <algorithm>
cannam@147 37
cannam@147 38 namespace kj {
cannam@147 39 namespace {
cannam@147 40
cannam@147 41 inline void delay() { usleep(10000); }
cannam@147 42
cannam@147 43 // On OSX, si_code seems to be zero when SI_USER is expected.
cannam@147 44 #if __linux__ || __CYGWIN__
cannam@147 45 #define EXPECT_SI_CODE EXPECT_EQ
cannam@147 46 #else
cannam@147 47 #define EXPECT_SI_CODE(a,b)
cannam@147 48 #endif
cannam@147 49
cannam@147 50 void captureSignals() {
cannam@147 51 static bool captured = false;
cannam@147 52 if (!captured) {
cannam@147 53 captured = true;
cannam@147 54
cannam@147 55 // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
cannam@147 56 // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test. We can't
cannam@147 57 // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
cannam@147 58 UnixEventPort::captureSignal(SIGURG);
cannam@147 59 UnixEventPort::captureSignal(SIGIO);
cannam@147 60 }
cannam@147 61 }
cannam@147 62
cannam@147 63 TEST(AsyncUnixTest, Signals) {
cannam@147 64 captureSignals();
cannam@147 65 UnixEventPort port;
cannam@147 66 EventLoop loop(port);
cannam@147 67 WaitScope waitScope(loop);
cannam@147 68
cannam@147 69 kill(getpid(), SIGURG);
cannam@147 70
cannam@147 71 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
cannam@147 72 EXPECT_EQ(SIGURG, info.si_signo);
cannam@147 73 EXPECT_SI_CODE(SI_USER, info.si_code);
cannam@147 74 }
cannam@147 75
cannam@147 76 #if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
cannam@147 77 TEST(AsyncUnixTest, SignalWithValue) {
cannam@147 78 // This tests that if we use sigqueue() to attach a value to the signal, that value is received
cannam@147 79 // correctly. Note that this only works on platforms that support real-time signals -- even
cannam@147 80 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
cannam@147 81 // signals. Hence this test won't run on e.g. Mac OSX.
cannam@147 82 //
cannam@147 83 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
cannam@147 84 //
cannam@147 85 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
cannam@147 86 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
cannam@147 87 // Sad. https://github.com/sandstorm-io/capnproto/issues/204
cannam@147 88
cannam@147 89 captureSignals();
cannam@147 90 UnixEventPort port;
cannam@147 91 EventLoop loop(port);
cannam@147 92 WaitScope waitScope(loop);
cannam@147 93
cannam@147 94 union sigval value;
cannam@147 95 memset(&value, 0, sizeof(value));
cannam@147 96 value.sival_int = 123;
cannam@147 97 sigqueue(getpid(), SIGURG, value);
cannam@147 98
cannam@147 99 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
cannam@147 100 EXPECT_EQ(SIGURG, info.si_signo);
cannam@147 101 EXPECT_SI_CODE(SI_QUEUE, info.si_code);
cannam@147 102 EXPECT_EQ(123, info.si_value.sival_int);
cannam@147 103 }
cannam@147 104
cannam@147 105 TEST(AsyncUnixTest, SignalWithPointerValue) {
cannam@147 106 // This tests that if we use sigqueue() to attach a value to the signal, that value is received
cannam@147 107 // correctly. Note that this only works on platforms that support real-time signals -- even
cannam@147 108 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
cannam@147 109 // signals. Hence this test won't run on e.g. Mac OSX.
cannam@147 110 //
cannam@147 111 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
cannam@147 112 //
cannam@147 113 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
cannam@147 114 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
cannam@147 115 // Sad. https://github.com/sandstorm-io/capnproto/issues/204
cannam@147 116
cannam@147 117 captureSignals();
cannam@147 118 UnixEventPort port;
cannam@147 119 EventLoop loop(port);
cannam@147 120 WaitScope waitScope(loop);
cannam@147 121
cannam@147 122 union sigval value;
cannam@147 123 memset(&value, 0, sizeof(value));
cannam@147 124 value.sival_ptr = &port;
cannam@147 125 sigqueue(getpid(), SIGURG, value);
cannam@147 126
cannam@147 127 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
cannam@147 128 EXPECT_EQ(SIGURG, info.si_signo);
cannam@147 129 EXPECT_SI_CODE(SI_QUEUE, info.si_code);
cannam@147 130 EXPECT_EQ(&port, info.si_value.sival_ptr);
cannam@147 131 }
cannam@147 132 #endif
cannam@147 133
cannam@147 134 TEST(AsyncUnixTest, SignalsMultiListen) {
cannam@147 135 captureSignals();
cannam@147 136 UnixEventPort port;
cannam@147 137 EventLoop loop(port);
cannam@147 138 WaitScope waitScope(loop);
cannam@147 139
cannam@147 140 port.onSignal(SIGIO).then([](siginfo_t&&) {
cannam@147 141 KJ_FAIL_EXPECT("Received wrong signal.");
cannam@147 142 }).detach([](kj::Exception&& exception) {
cannam@147 143 KJ_FAIL_EXPECT(exception);
cannam@147 144 });
cannam@147 145
cannam@147 146 kill(getpid(), SIGURG);
cannam@147 147
cannam@147 148 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
cannam@147 149 EXPECT_EQ(SIGURG, info.si_signo);
cannam@147 150 EXPECT_SI_CODE(SI_USER, info.si_code);
cannam@147 151 }
cannam@147 152
cannam@147 153 #if !__CYGWIN32__
cannam@147 154 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
cannam@147 155 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
cannam@147 156 // platform I'm assuming it's a Cygwin bug.
cannam@147 157
cannam@147 158 TEST(AsyncUnixTest, SignalsMultiReceive) {
cannam@147 159 captureSignals();
cannam@147 160 UnixEventPort port;
cannam@147 161 EventLoop loop(port);
cannam@147 162 WaitScope waitScope(loop);
cannam@147 163
cannam@147 164 kill(getpid(), SIGURG);
cannam@147 165 kill(getpid(), SIGIO);
cannam@147 166
cannam@147 167 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
cannam@147 168 EXPECT_EQ(SIGURG, info.si_signo);
cannam@147 169 EXPECT_SI_CODE(SI_USER, info.si_code);
cannam@147 170
cannam@147 171 info = port.onSignal(SIGIO).wait(waitScope);
cannam@147 172 EXPECT_EQ(SIGIO, info.si_signo);
cannam@147 173 EXPECT_SI_CODE(SI_USER, info.si_code);
cannam@147 174 }
cannam@147 175
cannam@147 176 #endif // !__CYGWIN32__
cannam@147 177
cannam@147 178 TEST(AsyncUnixTest, SignalsAsync) {
cannam@147 179 captureSignals();
cannam@147 180 UnixEventPort port;
cannam@147 181 EventLoop loop(port);
cannam@147 182 WaitScope waitScope(loop);
cannam@147 183
cannam@147 184 // Arrange for a signal to be sent from another thread.
cannam@147 185 pthread_t mainThread = pthread_self();
cannam@147 186 Thread thread([&]() {
cannam@147 187 delay();
cannam@147 188 pthread_kill(mainThread, SIGURG);
cannam@147 189 });
cannam@147 190
cannam@147 191 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
cannam@147 192 EXPECT_EQ(SIGURG, info.si_signo);
cannam@147 193 #if __linux__
cannam@147 194 EXPECT_SI_CODE(SI_TKILL, info.si_code);
cannam@147 195 #endif
cannam@147 196 }
cannam@147 197
cannam@147 198 #if !__CYGWIN32__
cannam@147 199 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
cannam@147 200 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
cannam@147 201 // platform I'm assuming it's a Cygwin bug.
cannam@147 202
cannam@147 203 TEST(AsyncUnixTest, SignalsNoWait) {
cannam@147 204 // Verify that UnixEventPort::poll() correctly receives pending signals.
cannam@147 205
cannam@147 206 captureSignals();
cannam@147 207 UnixEventPort port;
cannam@147 208 EventLoop loop(port);
cannam@147 209 WaitScope waitScope(loop);
cannam@147 210
cannam@147 211 bool receivedSigurg = false;
cannam@147 212 bool receivedSigio = false;
cannam@147 213 port.onSignal(SIGURG).then([&](siginfo_t&& info) {
cannam@147 214 receivedSigurg = true;
cannam@147 215 EXPECT_EQ(SIGURG, info.si_signo);
cannam@147 216 EXPECT_SI_CODE(SI_USER, info.si_code);
cannam@147 217 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
cannam@147 218 port.onSignal(SIGIO).then([&](siginfo_t&& info) {
cannam@147 219 receivedSigio = true;
cannam@147 220 EXPECT_EQ(SIGIO, info.si_signo);
cannam@147 221 EXPECT_SI_CODE(SI_USER, info.si_code);
cannam@147 222 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
cannam@147 223
cannam@147 224 kill(getpid(), SIGURG);
cannam@147 225 kill(getpid(), SIGIO);
cannam@147 226
cannam@147 227 EXPECT_FALSE(receivedSigurg);
cannam@147 228 EXPECT_FALSE(receivedSigio);
cannam@147 229
cannam@147 230 loop.run();
cannam@147 231
cannam@147 232 EXPECT_FALSE(receivedSigurg);
cannam@147 233 EXPECT_FALSE(receivedSigio);
cannam@147 234
cannam@147 235 port.poll();
cannam@147 236
cannam@147 237 EXPECT_FALSE(receivedSigurg);
cannam@147 238 EXPECT_FALSE(receivedSigio);
cannam@147 239
cannam@147 240 loop.run();
cannam@147 241
cannam@147 242 EXPECT_TRUE(receivedSigurg);
cannam@147 243 EXPECT_TRUE(receivedSigio);
cannam@147 244 }
cannam@147 245
cannam@147 246 #endif // !__CYGWIN32__
cannam@147 247
cannam@147 248 TEST(AsyncUnixTest, ReadObserver) {
cannam@147 249 captureSignals();
cannam@147 250 UnixEventPort port;
cannam@147 251 EventLoop loop(port);
cannam@147 252 WaitScope waitScope(loop);
cannam@147 253
cannam@147 254 int pipefds[2];
cannam@147 255 KJ_SYSCALL(pipe(pipefds));
cannam@147 256 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
cannam@147 257
cannam@147 258 UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 259
cannam@147 260 KJ_SYSCALL(write(outfd, "foo", 3));
cannam@147 261
cannam@147 262 observer.whenBecomesReadable().wait(waitScope);
cannam@147 263
cannam@147 264 #if __linux__ // platform known to support POLLRDHUP
cannam@147 265 EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
cannam@147 266
cannam@147 267 char buffer[4096];
cannam@147 268 ssize_t n;
cannam@147 269 KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
cannam@147 270 EXPECT_EQ(3, n);
cannam@147 271
cannam@147 272 KJ_SYSCALL(write(outfd, "bar", 3));
cannam@147 273 outfd = nullptr;
cannam@147 274
cannam@147 275 observer.whenBecomesReadable().wait(waitScope);
cannam@147 276
cannam@147 277 EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
cannam@147 278 #endif
cannam@147 279 }
cannam@147 280
cannam@147 281 TEST(AsyncUnixTest, ReadObserverMultiListen) {
cannam@147 282 captureSignals();
cannam@147 283 UnixEventPort port;
cannam@147 284 EventLoop loop(port);
cannam@147 285 WaitScope waitScope(loop);
cannam@147 286
cannam@147 287 int bogusPipefds[2];
cannam@147 288 KJ_SYSCALL(pipe(bogusPipefds));
cannam@147 289 KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
cannam@147 290
cannam@147 291 UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
cannam@147 292 UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 293
cannam@147 294 bogusObserver.whenBecomesReadable().then([]() {
cannam@147 295 ADD_FAILURE() << "Received wrong poll.";
cannam@147 296 }).detach([](kj::Exception&& exception) {
cannam@147 297 ADD_FAILURE() << kj::str(exception).cStr();
cannam@147 298 });
cannam@147 299
cannam@147 300 int pipefds[2];
cannam@147 301 KJ_SYSCALL(pipe(pipefds));
cannam@147 302 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
cannam@147 303
cannam@147 304 UnixEventPort::FdObserver observer(port, pipefds[0],
cannam@147 305 UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 306 KJ_SYSCALL(write(pipefds[1], "foo", 3));
cannam@147 307
cannam@147 308 observer.whenBecomesReadable().wait(waitScope);
cannam@147 309 }
cannam@147 310
cannam@147 311 TEST(AsyncUnixTest, ReadObserverMultiReceive) {
cannam@147 312 captureSignals();
cannam@147 313 UnixEventPort port;
cannam@147 314 EventLoop loop(port);
cannam@147 315 WaitScope waitScope(loop);
cannam@147 316
cannam@147 317 int pipefds[2];
cannam@147 318 KJ_SYSCALL(pipe(pipefds));
cannam@147 319 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
cannam@147 320
cannam@147 321 UnixEventPort::FdObserver observer(port, pipefds[0],
cannam@147 322 UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 323 KJ_SYSCALL(write(pipefds[1], "foo", 3));
cannam@147 324
cannam@147 325 int pipefds2[2];
cannam@147 326 KJ_SYSCALL(pipe(pipefds2));
cannam@147 327 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
cannam@147 328
cannam@147 329 UnixEventPort::FdObserver observer2(port, pipefds2[0],
cannam@147 330 UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 331 KJ_SYSCALL(write(pipefds2[1], "bar", 3));
cannam@147 332
cannam@147 333 auto promise1 = observer.whenBecomesReadable();
cannam@147 334 auto promise2 = observer2.whenBecomesReadable();
cannam@147 335 promise1.wait(waitScope);
cannam@147 336 promise2.wait(waitScope);
cannam@147 337 }
cannam@147 338
cannam@147 339 TEST(AsyncUnixTest, ReadObserverAsync) {
cannam@147 340 captureSignals();
cannam@147 341 UnixEventPort port;
cannam@147 342 EventLoop loop(port);
cannam@147 343 WaitScope waitScope(loop);
cannam@147 344
cannam@147 345 // Make a pipe and wait on its read end while another thread writes to it.
cannam@147 346 int pipefds[2];
cannam@147 347 KJ_SYSCALL(pipe(pipefds));
cannam@147 348 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
cannam@147 349 UnixEventPort::FdObserver observer(port, pipefds[0],
cannam@147 350 UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 351
cannam@147 352 Thread thread([&]() {
cannam@147 353 delay();
cannam@147 354 KJ_SYSCALL(write(pipefds[1], "foo", 3));
cannam@147 355 });
cannam@147 356
cannam@147 357 // Wait for the event in this thread.
cannam@147 358 observer.whenBecomesReadable().wait(waitScope);
cannam@147 359 }
cannam@147 360
cannam@147 361 TEST(AsyncUnixTest, ReadObserverNoWait) {
cannam@147 362 // Verify that UnixEventPort::poll() correctly receives pending FD events.
cannam@147 363
cannam@147 364 captureSignals();
cannam@147 365 UnixEventPort port;
cannam@147 366 EventLoop loop(port);
cannam@147 367 WaitScope waitScope(loop);
cannam@147 368
cannam@147 369 int pipefds[2];
cannam@147 370 KJ_SYSCALL(pipe(pipefds));
cannam@147 371 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
cannam@147 372 UnixEventPort::FdObserver observer(port, pipefds[0],
cannam@147 373 UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 374
cannam@147 375 int pipefds2[2];
cannam@147 376 KJ_SYSCALL(pipe(pipefds2));
cannam@147 377 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
cannam@147 378 UnixEventPort::FdObserver observer2(port, pipefds2[0],
cannam@147 379 UnixEventPort::FdObserver::OBSERVE_READ);
cannam@147 380
cannam@147 381 int receivedCount = 0;
cannam@147 382 observer.whenBecomesReadable().then([&]() {
cannam@147 383 receivedCount++;
cannam@147 384 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
cannam@147 385 observer2.whenBecomesReadable().then([&]() {
cannam@147 386 receivedCount++;
cannam@147 387 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
cannam@147 388
cannam@147 389 KJ_SYSCALL(write(pipefds[1], "foo", 3));
cannam@147 390 KJ_SYSCALL(write(pipefds2[1], "bar", 3));
cannam@147 391
cannam@147 392 EXPECT_EQ(0, receivedCount);
cannam@147 393
cannam@147 394 loop.run();
cannam@147 395
cannam@147 396 EXPECT_EQ(0, receivedCount);
cannam@147 397
cannam@147 398 port.poll();
cannam@147 399
cannam@147 400 EXPECT_EQ(0, receivedCount);
cannam@147 401
cannam@147 402 loop.run();
cannam@147 403
cannam@147 404 EXPECT_EQ(2, receivedCount);
cannam@147 405 }
cannam@147 406
cannam@147 407 static void setNonblocking(int fd) {
cannam@147 408 int flags;
cannam@147 409 KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
cannam@147 410 if ((flags & O_NONBLOCK) == 0) {
cannam@147 411 KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
cannam@147 412 }
cannam@147 413 }
cannam@147 414
cannam@147 415 TEST(AsyncUnixTest, WriteObserver) {
cannam@147 416 captureSignals();
cannam@147 417 UnixEventPort port;
cannam@147 418 EventLoop loop(port);
cannam@147 419 WaitScope waitScope(loop);
cannam@147 420
cannam@147 421 int pipefds[2];
cannam@147 422 KJ_SYSCALL(pipe(pipefds));
cannam@147 423 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
cannam@147 424 setNonblocking(outfd);
cannam@147 425 setNonblocking(infd);
cannam@147 426
cannam@147 427 UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
cannam@147 428
cannam@147 429 // Fill buffer.
cannam@147 430 ssize_t n;
cannam@147 431 do {
cannam@147 432 KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
cannam@147 433 } while (n >= 0);
cannam@147 434
cannam@147 435 bool writable = false;
cannam@147 436 auto promise = observer.whenBecomesWritable()
cannam@147 437 .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
cannam@147 438
cannam@147 439 loop.run();
cannam@147 440 port.poll();
cannam@147 441 loop.run();
cannam@147 442
cannam@147 443 EXPECT_FALSE(writable);
cannam@147 444
cannam@147 445 // Empty the read end so that the write end becomes writable. Note that Linux implements a
cannam@147 446 // high watermark / low watermark heuristic which means that only reading one byte is not
cannam@147 447 // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
cannam@147 448 // 1 page. To be safe, we read everything.
cannam@147 449 char buffer[4096];
cannam@147 450 do {
cannam@147 451 KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
cannam@147 452 } while (n > 0);
cannam@147 453
cannam@147 454 loop.run();
cannam@147 455 port.poll();
cannam@147 456 loop.run();
cannam@147 457
cannam@147 458 EXPECT_TRUE(writable);
cannam@147 459 }
cannam@147 460
cannam@147 461 #if !__APPLE__
cannam@147 462 // Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374.
cannam@147 463 TEST(AsyncUnixTest, UrgentObserver) {
cannam@147 464 // Verify that FdObserver correctly detects availability of out-of-band data.
cannam@147 465 // Availability of out-of-band data is implementation-specific.
cannam@147 466 // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
cannam@147 467 // for this test.
cannam@147 468
cannam@147 469 UnixEventPort port;
cannam@147 470 EventLoop loop(port);
cannam@147 471 WaitScope waitScope(loop);
cannam@147 472 int tmpFd;
cannam@147 473 char c;
cannam@147 474
cannam@147 475 // Spawn a TCP server
cannam@147 476 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
cannam@147 477 kj::AutoCloseFd serverFd(tmpFd);
cannam@147 478 sockaddr_in saddr;
cannam@147 479 memset(&saddr, 0, sizeof(saddr));
cannam@147 480 saddr.sin_family = AF_INET;
cannam@147 481 saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
cannam@147 482 KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
cannam@147 483 socklen_t saddrLen = sizeof(saddr);
cannam@147 484 KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
cannam@147 485 KJ_SYSCALL(listen(serverFd, 1));
cannam@147 486
cannam@147 487 // Accept one connection, send in-band and OOB byte, wait for a quit message
cannam@147 488 Thread thread([&]() {
cannam@147 489 int tmpFd;
cannam@147 490 char c;
cannam@147 491
cannam@147 492 sockaddr_in caddr;
cannam@147 493 socklen_t caddrLen = sizeof(caddr);
cannam@147 494 KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
cannam@147 495 kj::AutoCloseFd clientFd(tmpFd);
cannam@147 496 delay();
cannam@147 497
cannam@147 498 // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
cannam@147 499 c = 'i';
cannam@147 500 KJ_SYSCALL(send(clientFd, &c, 1, 0));
cannam@147 501 c = 'o';
cannam@147 502 KJ_SYSCALL(send(clientFd, &c, 1, MSG_OOB));
cannam@147 503
cannam@147 504 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
cannam@147 505 EXPECT_EQ('q', c);
cannam@147 506 });
cannam@147 507 KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
cannam@147 508
cannam@147 509 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
cannam@147 510 kj::AutoCloseFd clientFd(tmpFd);
cannam@147 511 KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
cannam@147 512
cannam@147 513 UnixEventPort::FdObserver observer(port, clientFd,
cannam@147 514 UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
cannam@147 515
cannam@147 516 observer.whenUrgentDataAvailable().wait(waitScope);
cannam@147 517
cannam@147 518 #if __CYGWIN__
cannam@147 519 // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
cannam@147 520 // such a time as the connection closes -- and then the byte is successfully returned. This
cannam@147 521 // seems to be a cygwin bug.
cannam@147 522 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
cannam@147 523 EXPECT_EQ('i', c);
cannam@147 524 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
cannam@147 525 EXPECT_EQ('o', c);
cannam@147 526 #else
cannam@147 527 // Attempt to read the urgent byte prior to reading the in-band byte.
cannam@147 528 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
cannam@147 529 EXPECT_EQ('o', c);
cannam@147 530 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
cannam@147 531 EXPECT_EQ('i', c);
cannam@147 532 #endif
cannam@147 533
cannam@147 534 // Allow server thread to let its clientFd go out of scope.
cannam@147 535 c = 'q';
cannam@147 536 KJ_SYSCALL(send(clientFd, &c, 1, 0));
cannam@147 537 KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
cannam@147 538 }
cannam@147 539 #endif
cannam@147 540
cannam@147 541 TEST(AsyncUnixTest, SteadyTimers) {
cannam@147 542 captureSignals();
cannam@147 543 UnixEventPort port;
cannam@147 544 EventLoop loop(port);
cannam@147 545 WaitScope waitScope(loop);
cannam@147 546
cannam@147 547 auto& timer = port.getTimer();
cannam@147 548
cannam@147 549 auto start = timer.now();
cannam@147 550 kj::Vector<TimePoint> expected;
cannam@147 551 kj::Vector<TimePoint> actual;
cannam@147 552
cannam@147 553 auto addTimer = [&](Duration delay) {
cannam@147 554 expected.add(max(start + delay, start));
cannam@147 555 timer.atTime(start + delay).then([&]() {
cannam@147 556 actual.add(timer.now());
cannam@147 557 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
cannam@147 558 };
cannam@147 559
cannam@147 560 addTimer(30 * MILLISECONDS);
cannam@147 561 addTimer(40 * MILLISECONDS);
cannam@147 562 addTimer(20350 * MICROSECONDS);
cannam@147 563 addTimer(30 * MILLISECONDS);
cannam@147 564 addTimer(-10 * MILLISECONDS);
cannam@147 565
cannam@147 566 std::sort(expected.begin(), expected.end());
cannam@147 567 timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
cannam@147 568
cannam@147 569 ASSERT_EQ(expected.size(), actual.size());
cannam@147 570 for (int i = 0; i < expected.size(); ++i) {
cannam@147 571 KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
cannam@147 572 i, ((expected[i] - actual[i]) / NANOSECONDS));
cannam@147 573 }
cannam@147 574 }
cannam@147 575
cannam@147 576 TEST(AsyncUnixTest, Wake) {
cannam@147 577 captureSignals();
cannam@147 578 UnixEventPort port;
cannam@147 579 EventLoop loop(port);
cannam@147 580 WaitScope waitScope(loop);
cannam@147 581
cannam@147 582 EXPECT_FALSE(port.poll());
cannam@147 583 port.wake();
cannam@147 584 EXPECT_TRUE(port.poll());
cannam@147 585 EXPECT_FALSE(port.poll());
cannam@147 586
cannam@147 587 port.wake();
cannam@147 588 EXPECT_TRUE(port.wait());
cannam@147 589
cannam@147 590 {
cannam@147 591 auto promise = port.getTimer().atTime(port.getTimer().now());
cannam@147 592 EXPECT_FALSE(port.wait());
cannam@147 593 }
cannam@147 594
cannam@147 595 bool woken = false;
cannam@147 596 Thread thread([&]() {
cannam@147 597 delay();
cannam@147 598 woken = true;
cannam@147 599 port.wake();
cannam@147 600 });
cannam@147 601
cannam@147 602 EXPECT_TRUE(port.wait());
cannam@147 603 }
cannam@147 604
cannam@147 605 } // namespace
cannam@147 606 } // namespace kj
cannam@147 607
cannam@147 608 #endif // !_WIN32