cannam@147
|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
|
cannam@147
|
2 // Licensed under the MIT License:
|
cannam@147
|
3 //
|
cannam@147
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
cannam@147
|
5 // of this software and associated documentation files (the "Software"), to deal
|
cannam@147
|
6 // in the Software without restriction, including without limitation the rights
|
cannam@147
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
cannam@147
|
8 // copies of the Software, and to permit persons to whom the Software is
|
cannam@147
|
9 // furnished to do so, subject to the following conditions:
|
cannam@147
|
10 //
|
cannam@147
|
11 // The above copyright notice and this permission notice shall be included in
|
cannam@147
|
12 // all copies or substantial portions of the Software.
|
cannam@147
|
13 //
|
cannam@147
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
cannam@147
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
cannam@147
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
cannam@147
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
cannam@147
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
cannam@147
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
cannam@147
|
20 // THE SOFTWARE.
|
cannam@147
|
21
|
cannam@147
|
22 #if !_WIN32
|
cannam@147
|
23
|
cannam@147
|
24 #include "async-unix.h"
|
cannam@147
|
25 #include "thread.h"
|
cannam@147
|
26 #include "debug.h"
|
cannam@147
|
27 #include "io.h"
|
cannam@147
|
28 #include <unistd.h>
|
cannam@147
|
29 #include <fcntl.h>
|
cannam@147
|
30 #include <sys/types.h>
|
cannam@147
|
31 #include <sys/socket.h>
|
cannam@147
|
32 #include <sys/stat.h>
|
cannam@147
|
33 #include <netinet/in.h>
|
cannam@147
|
34 #include <kj/compat/gtest.h>
|
cannam@147
|
35 #include <pthread.h>
|
cannam@147
|
36 #include <algorithm>
|
cannam@147
|
37
|
cannam@147
|
38 namespace kj {
|
cannam@147
|
39 namespace {
|
cannam@147
|
40
|
cannam@147
|
41 inline void delay() { usleep(10000); }
|
cannam@147
|
42
|
cannam@147
|
43 // On OSX, si_code seems to be zero when SI_USER is expected.
|
cannam@147
|
44 #if __linux__ || __CYGWIN__
|
cannam@147
|
45 #define EXPECT_SI_CODE EXPECT_EQ
|
cannam@147
|
46 #else
|
cannam@147
|
47 #define EXPECT_SI_CODE(a,b)
|
cannam@147
|
48 #endif
|
cannam@147
|
49
|
cannam@147
|
50 void captureSignals() {
|
cannam@147
|
51 static bool captured = false;
|
cannam@147
|
52 if (!captured) {
|
cannam@147
|
53 captured = true;
|
cannam@147
|
54
|
cannam@147
|
55 // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
|
cannam@147
|
56 // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test. We can't
|
cannam@147
|
57 // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
|
cannam@147
|
58 UnixEventPort::captureSignal(SIGURG);
|
cannam@147
|
59 UnixEventPort::captureSignal(SIGIO);
|
cannam@147
|
60 }
|
cannam@147
|
61 }
|
cannam@147
|
62
|
cannam@147
|
63 TEST(AsyncUnixTest, Signals) {
|
cannam@147
|
64 captureSignals();
|
cannam@147
|
65 UnixEventPort port;
|
cannam@147
|
66 EventLoop loop(port);
|
cannam@147
|
67 WaitScope waitScope(loop);
|
cannam@147
|
68
|
cannam@147
|
69 kill(getpid(), SIGURG);
|
cannam@147
|
70
|
cannam@147
|
71 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
|
cannam@147
|
72 EXPECT_EQ(SIGURG, info.si_signo);
|
cannam@147
|
73 EXPECT_SI_CODE(SI_USER, info.si_code);
|
cannam@147
|
74 }
|
cannam@147
|
75
|
cannam@147
|
76 #if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
|
cannam@147
|
77 TEST(AsyncUnixTest, SignalWithValue) {
|
cannam@147
|
78 // This tests that if we use sigqueue() to attach a value to the signal, that value is received
|
cannam@147
|
79 // correctly. Note that this only works on platforms that support real-time signals -- even
|
cannam@147
|
80 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
|
cannam@147
|
81 // signals. Hence this test won't run on e.g. Mac OSX.
|
cannam@147
|
82 //
|
cannam@147
|
83 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
|
cannam@147
|
84 //
|
cannam@147
|
85 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
|
cannam@147
|
86 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
|
cannam@147
|
87 // Sad. https://github.com/sandstorm-io/capnproto/issues/204
|
cannam@147
|
88
|
cannam@147
|
89 captureSignals();
|
cannam@147
|
90 UnixEventPort port;
|
cannam@147
|
91 EventLoop loop(port);
|
cannam@147
|
92 WaitScope waitScope(loop);
|
cannam@147
|
93
|
cannam@147
|
94 union sigval value;
|
cannam@147
|
95 memset(&value, 0, sizeof(value));
|
cannam@147
|
96 value.sival_int = 123;
|
cannam@147
|
97 sigqueue(getpid(), SIGURG, value);
|
cannam@147
|
98
|
cannam@147
|
99 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
|
cannam@147
|
100 EXPECT_EQ(SIGURG, info.si_signo);
|
cannam@147
|
101 EXPECT_SI_CODE(SI_QUEUE, info.si_code);
|
cannam@147
|
102 EXPECT_EQ(123, info.si_value.sival_int);
|
cannam@147
|
103 }
|
cannam@147
|
104
|
cannam@147
|
105 TEST(AsyncUnixTest, SignalWithPointerValue) {
|
cannam@147
|
106 // This tests that if we use sigqueue() to attach a value to the signal, that value is received
|
cannam@147
|
107 // correctly. Note that this only works on platforms that support real-time signals -- even
|
cannam@147
|
108 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
|
cannam@147
|
109 // signals. Hence this test won't run on e.g. Mac OSX.
|
cannam@147
|
110 //
|
cannam@147
|
111 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
|
cannam@147
|
112 //
|
cannam@147
|
113 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
|
cannam@147
|
114 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
|
cannam@147
|
115 // Sad. https://github.com/sandstorm-io/capnproto/issues/204
|
cannam@147
|
116
|
cannam@147
|
117 captureSignals();
|
cannam@147
|
118 UnixEventPort port;
|
cannam@147
|
119 EventLoop loop(port);
|
cannam@147
|
120 WaitScope waitScope(loop);
|
cannam@147
|
121
|
cannam@147
|
122 union sigval value;
|
cannam@147
|
123 memset(&value, 0, sizeof(value));
|
cannam@147
|
124 value.sival_ptr = &port;
|
cannam@147
|
125 sigqueue(getpid(), SIGURG, value);
|
cannam@147
|
126
|
cannam@147
|
127 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
|
cannam@147
|
128 EXPECT_EQ(SIGURG, info.si_signo);
|
cannam@147
|
129 EXPECT_SI_CODE(SI_QUEUE, info.si_code);
|
cannam@147
|
130 EXPECT_EQ(&port, info.si_value.sival_ptr);
|
cannam@147
|
131 }
|
cannam@147
|
132 #endif
|
cannam@147
|
133
|
cannam@147
|
134 TEST(AsyncUnixTest, SignalsMultiListen) {
|
cannam@147
|
135 captureSignals();
|
cannam@147
|
136 UnixEventPort port;
|
cannam@147
|
137 EventLoop loop(port);
|
cannam@147
|
138 WaitScope waitScope(loop);
|
cannam@147
|
139
|
cannam@147
|
140 port.onSignal(SIGIO).then([](siginfo_t&&) {
|
cannam@147
|
141 KJ_FAIL_EXPECT("Received wrong signal.");
|
cannam@147
|
142 }).detach([](kj::Exception&& exception) {
|
cannam@147
|
143 KJ_FAIL_EXPECT(exception);
|
cannam@147
|
144 });
|
cannam@147
|
145
|
cannam@147
|
146 kill(getpid(), SIGURG);
|
cannam@147
|
147
|
cannam@147
|
148 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
|
cannam@147
|
149 EXPECT_EQ(SIGURG, info.si_signo);
|
cannam@147
|
150 EXPECT_SI_CODE(SI_USER, info.si_code);
|
cannam@147
|
151 }
|
cannam@147
|
152
|
cannam@147
|
153 #if !__CYGWIN32__
|
cannam@147
|
154 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
|
cannam@147
|
155 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
|
cannam@147
|
156 // platform I'm assuming it's a Cygwin bug.
|
cannam@147
|
157
|
cannam@147
|
158 TEST(AsyncUnixTest, SignalsMultiReceive) {
|
cannam@147
|
159 captureSignals();
|
cannam@147
|
160 UnixEventPort port;
|
cannam@147
|
161 EventLoop loop(port);
|
cannam@147
|
162 WaitScope waitScope(loop);
|
cannam@147
|
163
|
cannam@147
|
164 kill(getpid(), SIGURG);
|
cannam@147
|
165 kill(getpid(), SIGIO);
|
cannam@147
|
166
|
cannam@147
|
167 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
|
cannam@147
|
168 EXPECT_EQ(SIGURG, info.si_signo);
|
cannam@147
|
169 EXPECT_SI_CODE(SI_USER, info.si_code);
|
cannam@147
|
170
|
cannam@147
|
171 info = port.onSignal(SIGIO).wait(waitScope);
|
cannam@147
|
172 EXPECT_EQ(SIGIO, info.si_signo);
|
cannam@147
|
173 EXPECT_SI_CODE(SI_USER, info.si_code);
|
cannam@147
|
174 }
|
cannam@147
|
175
|
cannam@147
|
176 #endif // !__CYGWIN32__
|
cannam@147
|
177
|
cannam@147
|
178 TEST(AsyncUnixTest, SignalsAsync) {
|
cannam@147
|
179 captureSignals();
|
cannam@147
|
180 UnixEventPort port;
|
cannam@147
|
181 EventLoop loop(port);
|
cannam@147
|
182 WaitScope waitScope(loop);
|
cannam@147
|
183
|
cannam@147
|
184 // Arrange for a signal to be sent from another thread.
|
cannam@147
|
185 pthread_t mainThread = pthread_self();
|
cannam@147
|
186 Thread thread([&]() {
|
cannam@147
|
187 delay();
|
cannam@147
|
188 pthread_kill(mainThread, SIGURG);
|
cannam@147
|
189 });
|
cannam@147
|
190
|
cannam@147
|
191 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
|
cannam@147
|
192 EXPECT_EQ(SIGURG, info.si_signo);
|
cannam@147
|
193 #if __linux__
|
cannam@147
|
194 EXPECT_SI_CODE(SI_TKILL, info.si_code);
|
cannam@147
|
195 #endif
|
cannam@147
|
196 }
|
cannam@147
|
197
|
cannam@147
|
198 #if !__CYGWIN32__
|
cannam@147
|
199 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
|
cannam@147
|
200 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
|
cannam@147
|
201 // platform I'm assuming it's a Cygwin bug.
|
cannam@147
|
202
|
cannam@147
|
203 TEST(AsyncUnixTest, SignalsNoWait) {
|
cannam@147
|
204 // Verify that UnixEventPort::poll() correctly receives pending signals.
|
cannam@147
|
205
|
cannam@147
|
206 captureSignals();
|
cannam@147
|
207 UnixEventPort port;
|
cannam@147
|
208 EventLoop loop(port);
|
cannam@147
|
209 WaitScope waitScope(loop);
|
cannam@147
|
210
|
cannam@147
|
211 bool receivedSigurg = false;
|
cannam@147
|
212 bool receivedSigio = false;
|
cannam@147
|
213 port.onSignal(SIGURG).then([&](siginfo_t&& info) {
|
cannam@147
|
214 receivedSigurg = true;
|
cannam@147
|
215 EXPECT_EQ(SIGURG, info.si_signo);
|
cannam@147
|
216 EXPECT_SI_CODE(SI_USER, info.si_code);
|
cannam@147
|
217 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
|
cannam@147
|
218 port.onSignal(SIGIO).then([&](siginfo_t&& info) {
|
cannam@147
|
219 receivedSigio = true;
|
cannam@147
|
220 EXPECT_EQ(SIGIO, info.si_signo);
|
cannam@147
|
221 EXPECT_SI_CODE(SI_USER, info.si_code);
|
cannam@147
|
222 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
|
cannam@147
|
223
|
cannam@147
|
224 kill(getpid(), SIGURG);
|
cannam@147
|
225 kill(getpid(), SIGIO);
|
cannam@147
|
226
|
cannam@147
|
227 EXPECT_FALSE(receivedSigurg);
|
cannam@147
|
228 EXPECT_FALSE(receivedSigio);
|
cannam@147
|
229
|
cannam@147
|
230 loop.run();
|
cannam@147
|
231
|
cannam@147
|
232 EXPECT_FALSE(receivedSigurg);
|
cannam@147
|
233 EXPECT_FALSE(receivedSigio);
|
cannam@147
|
234
|
cannam@147
|
235 port.poll();
|
cannam@147
|
236
|
cannam@147
|
237 EXPECT_FALSE(receivedSigurg);
|
cannam@147
|
238 EXPECT_FALSE(receivedSigio);
|
cannam@147
|
239
|
cannam@147
|
240 loop.run();
|
cannam@147
|
241
|
cannam@147
|
242 EXPECT_TRUE(receivedSigurg);
|
cannam@147
|
243 EXPECT_TRUE(receivedSigio);
|
cannam@147
|
244 }
|
cannam@147
|
245
|
cannam@147
|
246 #endif // !__CYGWIN32__
|
cannam@147
|
247
|
cannam@147
|
248 TEST(AsyncUnixTest, ReadObserver) {
|
cannam@147
|
249 captureSignals();
|
cannam@147
|
250 UnixEventPort port;
|
cannam@147
|
251 EventLoop loop(port);
|
cannam@147
|
252 WaitScope waitScope(loop);
|
cannam@147
|
253
|
cannam@147
|
254 int pipefds[2];
|
cannam@147
|
255 KJ_SYSCALL(pipe(pipefds));
|
cannam@147
|
256 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
|
cannam@147
|
257
|
cannam@147
|
258 UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
259
|
cannam@147
|
260 KJ_SYSCALL(write(outfd, "foo", 3));
|
cannam@147
|
261
|
cannam@147
|
262 observer.whenBecomesReadable().wait(waitScope);
|
cannam@147
|
263
|
cannam@147
|
264 #if __linux__ // platform known to support POLLRDHUP
|
cannam@147
|
265 EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
|
cannam@147
|
266
|
cannam@147
|
267 char buffer[4096];
|
cannam@147
|
268 ssize_t n;
|
cannam@147
|
269 KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
|
cannam@147
|
270 EXPECT_EQ(3, n);
|
cannam@147
|
271
|
cannam@147
|
272 KJ_SYSCALL(write(outfd, "bar", 3));
|
cannam@147
|
273 outfd = nullptr;
|
cannam@147
|
274
|
cannam@147
|
275 observer.whenBecomesReadable().wait(waitScope);
|
cannam@147
|
276
|
cannam@147
|
277 EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
|
cannam@147
|
278 #endif
|
cannam@147
|
279 }
|
cannam@147
|
280
|
cannam@147
|
281 TEST(AsyncUnixTest, ReadObserverMultiListen) {
|
cannam@147
|
282 captureSignals();
|
cannam@147
|
283 UnixEventPort port;
|
cannam@147
|
284 EventLoop loop(port);
|
cannam@147
|
285 WaitScope waitScope(loop);
|
cannam@147
|
286
|
cannam@147
|
287 int bogusPipefds[2];
|
cannam@147
|
288 KJ_SYSCALL(pipe(bogusPipefds));
|
cannam@147
|
289 KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
|
cannam@147
|
290
|
cannam@147
|
291 UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
|
cannam@147
|
292 UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
293
|
cannam@147
|
294 bogusObserver.whenBecomesReadable().then([]() {
|
cannam@147
|
295 ADD_FAILURE() << "Received wrong poll.";
|
cannam@147
|
296 }).detach([](kj::Exception&& exception) {
|
cannam@147
|
297 ADD_FAILURE() << kj::str(exception).cStr();
|
cannam@147
|
298 });
|
cannam@147
|
299
|
cannam@147
|
300 int pipefds[2];
|
cannam@147
|
301 KJ_SYSCALL(pipe(pipefds));
|
cannam@147
|
302 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
|
cannam@147
|
303
|
cannam@147
|
304 UnixEventPort::FdObserver observer(port, pipefds[0],
|
cannam@147
|
305 UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
306 KJ_SYSCALL(write(pipefds[1], "foo", 3));
|
cannam@147
|
307
|
cannam@147
|
308 observer.whenBecomesReadable().wait(waitScope);
|
cannam@147
|
309 }
|
cannam@147
|
310
|
cannam@147
|
311 TEST(AsyncUnixTest, ReadObserverMultiReceive) {
|
cannam@147
|
312 captureSignals();
|
cannam@147
|
313 UnixEventPort port;
|
cannam@147
|
314 EventLoop loop(port);
|
cannam@147
|
315 WaitScope waitScope(loop);
|
cannam@147
|
316
|
cannam@147
|
317 int pipefds[2];
|
cannam@147
|
318 KJ_SYSCALL(pipe(pipefds));
|
cannam@147
|
319 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
|
cannam@147
|
320
|
cannam@147
|
321 UnixEventPort::FdObserver observer(port, pipefds[0],
|
cannam@147
|
322 UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
323 KJ_SYSCALL(write(pipefds[1], "foo", 3));
|
cannam@147
|
324
|
cannam@147
|
325 int pipefds2[2];
|
cannam@147
|
326 KJ_SYSCALL(pipe(pipefds2));
|
cannam@147
|
327 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
|
cannam@147
|
328
|
cannam@147
|
329 UnixEventPort::FdObserver observer2(port, pipefds2[0],
|
cannam@147
|
330 UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
331 KJ_SYSCALL(write(pipefds2[1], "bar", 3));
|
cannam@147
|
332
|
cannam@147
|
333 auto promise1 = observer.whenBecomesReadable();
|
cannam@147
|
334 auto promise2 = observer2.whenBecomesReadable();
|
cannam@147
|
335 promise1.wait(waitScope);
|
cannam@147
|
336 promise2.wait(waitScope);
|
cannam@147
|
337 }
|
cannam@147
|
338
|
cannam@147
|
339 TEST(AsyncUnixTest, ReadObserverAsync) {
|
cannam@147
|
340 captureSignals();
|
cannam@147
|
341 UnixEventPort port;
|
cannam@147
|
342 EventLoop loop(port);
|
cannam@147
|
343 WaitScope waitScope(loop);
|
cannam@147
|
344
|
cannam@147
|
345 // Make a pipe and wait on its read end while another thread writes to it.
|
cannam@147
|
346 int pipefds[2];
|
cannam@147
|
347 KJ_SYSCALL(pipe(pipefds));
|
cannam@147
|
348 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
|
cannam@147
|
349 UnixEventPort::FdObserver observer(port, pipefds[0],
|
cannam@147
|
350 UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
351
|
cannam@147
|
352 Thread thread([&]() {
|
cannam@147
|
353 delay();
|
cannam@147
|
354 KJ_SYSCALL(write(pipefds[1], "foo", 3));
|
cannam@147
|
355 });
|
cannam@147
|
356
|
cannam@147
|
357 // Wait for the event in this thread.
|
cannam@147
|
358 observer.whenBecomesReadable().wait(waitScope);
|
cannam@147
|
359 }
|
cannam@147
|
360
|
cannam@147
|
361 TEST(AsyncUnixTest, ReadObserverNoWait) {
|
cannam@147
|
362 // Verify that UnixEventPort::poll() correctly receives pending FD events.
|
cannam@147
|
363
|
cannam@147
|
364 captureSignals();
|
cannam@147
|
365 UnixEventPort port;
|
cannam@147
|
366 EventLoop loop(port);
|
cannam@147
|
367 WaitScope waitScope(loop);
|
cannam@147
|
368
|
cannam@147
|
369 int pipefds[2];
|
cannam@147
|
370 KJ_SYSCALL(pipe(pipefds));
|
cannam@147
|
371 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
|
cannam@147
|
372 UnixEventPort::FdObserver observer(port, pipefds[0],
|
cannam@147
|
373 UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
374
|
cannam@147
|
375 int pipefds2[2];
|
cannam@147
|
376 KJ_SYSCALL(pipe(pipefds2));
|
cannam@147
|
377 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
|
cannam@147
|
378 UnixEventPort::FdObserver observer2(port, pipefds2[0],
|
cannam@147
|
379 UnixEventPort::FdObserver::OBSERVE_READ);
|
cannam@147
|
380
|
cannam@147
|
381 int receivedCount = 0;
|
cannam@147
|
382 observer.whenBecomesReadable().then([&]() {
|
cannam@147
|
383 receivedCount++;
|
cannam@147
|
384 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
|
cannam@147
|
385 observer2.whenBecomesReadable().then([&]() {
|
cannam@147
|
386 receivedCount++;
|
cannam@147
|
387 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
|
cannam@147
|
388
|
cannam@147
|
389 KJ_SYSCALL(write(pipefds[1], "foo", 3));
|
cannam@147
|
390 KJ_SYSCALL(write(pipefds2[1], "bar", 3));
|
cannam@147
|
391
|
cannam@147
|
392 EXPECT_EQ(0, receivedCount);
|
cannam@147
|
393
|
cannam@147
|
394 loop.run();
|
cannam@147
|
395
|
cannam@147
|
396 EXPECT_EQ(0, receivedCount);
|
cannam@147
|
397
|
cannam@147
|
398 port.poll();
|
cannam@147
|
399
|
cannam@147
|
400 EXPECT_EQ(0, receivedCount);
|
cannam@147
|
401
|
cannam@147
|
402 loop.run();
|
cannam@147
|
403
|
cannam@147
|
404 EXPECT_EQ(2, receivedCount);
|
cannam@147
|
405 }
|
cannam@147
|
406
|
cannam@147
|
407 static void setNonblocking(int fd) {
|
cannam@147
|
408 int flags;
|
cannam@147
|
409 KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
|
cannam@147
|
410 if ((flags & O_NONBLOCK) == 0) {
|
cannam@147
|
411 KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
|
cannam@147
|
412 }
|
cannam@147
|
413 }
|
cannam@147
|
414
|
cannam@147
|
415 TEST(AsyncUnixTest, WriteObserver) {
|
cannam@147
|
416 captureSignals();
|
cannam@147
|
417 UnixEventPort port;
|
cannam@147
|
418 EventLoop loop(port);
|
cannam@147
|
419 WaitScope waitScope(loop);
|
cannam@147
|
420
|
cannam@147
|
421 int pipefds[2];
|
cannam@147
|
422 KJ_SYSCALL(pipe(pipefds));
|
cannam@147
|
423 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
|
cannam@147
|
424 setNonblocking(outfd);
|
cannam@147
|
425 setNonblocking(infd);
|
cannam@147
|
426
|
cannam@147
|
427 UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
|
cannam@147
|
428
|
cannam@147
|
429 // Fill buffer.
|
cannam@147
|
430 ssize_t n;
|
cannam@147
|
431 do {
|
cannam@147
|
432 KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
|
cannam@147
|
433 } while (n >= 0);
|
cannam@147
|
434
|
cannam@147
|
435 bool writable = false;
|
cannam@147
|
436 auto promise = observer.whenBecomesWritable()
|
cannam@147
|
437 .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
|
cannam@147
|
438
|
cannam@147
|
439 loop.run();
|
cannam@147
|
440 port.poll();
|
cannam@147
|
441 loop.run();
|
cannam@147
|
442
|
cannam@147
|
443 EXPECT_FALSE(writable);
|
cannam@147
|
444
|
cannam@147
|
445 // Empty the read end so that the write end becomes writable. Note that Linux implements a
|
cannam@147
|
446 // high watermark / low watermark heuristic which means that only reading one byte is not
|
cannam@147
|
447 // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
|
cannam@147
|
448 // 1 page. To be safe, we read everything.
|
cannam@147
|
449 char buffer[4096];
|
cannam@147
|
450 do {
|
cannam@147
|
451 KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
|
cannam@147
|
452 } while (n > 0);
|
cannam@147
|
453
|
cannam@147
|
454 loop.run();
|
cannam@147
|
455 port.poll();
|
cannam@147
|
456 loop.run();
|
cannam@147
|
457
|
cannam@147
|
458 EXPECT_TRUE(writable);
|
cannam@147
|
459 }
|
cannam@147
|
460
|
cannam@147
|
461 #if !__APPLE__
|
cannam@147
|
462 // Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374.
|
cannam@147
|
463 TEST(AsyncUnixTest, UrgentObserver) {
|
cannam@147
|
464 // Verify that FdObserver correctly detects availability of out-of-band data.
|
cannam@147
|
465 // Availability of out-of-band data is implementation-specific.
|
cannam@147
|
466 // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
|
cannam@147
|
467 // for this test.
|
cannam@147
|
468
|
cannam@147
|
469 UnixEventPort port;
|
cannam@147
|
470 EventLoop loop(port);
|
cannam@147
|
471 WaitScope waitScope(loop);
|
cannam@147
|
472 int tmpFd;
|
cannam@147
|
473 char c;
|
cannam@147
|
474
|
cannam@147
|
475 // Spawn a TCP server
|
cannam@147
|
476 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
|
cannam@147
|
477 kj::AutoCloseFd serverFd(tmpFd);
|
cannam@147
|
478 sockaddr_in saddr;
|
cannam@147
|
479 memset(&saddr, 0, sizeof(saddr));
|
cannam@147
|
480 saddr.sin_family = AF_INET;
|
cannam@147
|
481 saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
cannam@147
|
482 KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
|
cannam@147
|
483 socklen_t saddrLen = sizeof(saddr);
|
cannam@147
|
484 KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
|
cannam@147
|
485 KJ_SYSCALL(listen(serverFd, 1));
|
cannam@147
|
486
|
cannam@147
|
487 // Accept one connection, send in-band and OOB byte, wait for a quit message
|
cannam@147
|
488 Thread thread([&]() {
|
cannam@147
|
489 int tmpFd;
|
cannam@147
|
490 char c;
|
cannam@147
|
491
|
cannam@147
|
492 sockaddr_in caddr;
|
cannam@147
|
493 socklen_t caddrLen = sizeof(caddr);
|
cannam@147
|
494 KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
|
cannam@147
|
495 kj::AutoCloseFd clientFd(tmpFd);
|
cannam@147
|
496 delay();
|
cannam@147
|
497
|
cannam@147
|
498 // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
|
cannam@147
|
499 c = 'i';
|
cannam@147
|
500 KJ_SYSCALL(send(clientFd, &c, 1, 0));
|
cannam@147
|
501 c = 'o';
|
cannam@147
|
502 KJ_SYSCALL(send(clientFd, &c, 1, MSG_OOB));
|
cannam@147
|
503
|
cannam@147
|
504 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
|
cannam@147
|
505 EXPECT_EQ('q', c);
|
cannam@147
|
506 });
|
cannam@147
|
507 KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
|
cannam@147
|
508
|
cannam@147
|
509 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
|
cannam@147
|
510 kj::AutoCloseFd clientFd(tmpFd);
|
cannam@147
|
511 KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
|
cannam@147
|
512
|
cannam@147
|
513 UnixEventPort::FdObserver observer(port, clientFd,
|
cannam@147
|
514 UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
|
cannam@147
|
515
|
cannam@147
|
516 observer.whenUrgentDataAvailable().wait(waitScope);
|
cannam@147
|
517
|
cannam@147
|
518 #if __CYGWIN__
|
cannam@147
|
519 // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
|
cannam@147
|
520 // such a time as the connection closes -- and then the byte is successfully returned. This
|
cannam@147
|
521 // seems to be a cygwin bug.
|
cannam@147
|
522 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
|
cannam@147
|
523 EXPECT_EQ('i', c);
|
cannam@147
|
524 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
|
cannam@147
|
525 EXPECT_EQ('o', c);
|
cannam@147
|
526 #else
|
cannam@147
|
527 // Attempt to read the urgent byte prior to reading the in-band byte.
|
cannam@147
|
528 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
|
cannam@147
|
529 EXPECT_EQ('o', c);
|
cannam@147
|
530 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
|
cannam@147
|
531 EXPECT_EQ('i', c);
|
cannam@147
|
532 #endif
|
cannam@147
|
533
|
cannam@147
|
534 // Allow server thread to let its clientFd go out of scope.
|
cannam@147
|
535 c = 'q';
|
cannam@147
|
536 KJ_SYSCALL(send(clientFd, &c, 1, 0));
|
cannam@147
|
537 KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
|
cannam@147
|
538 }
|
cannam@147
|
539 #endif
|
cannam@147
|
540
|
cannam@147
|
541 TEST(AsyncUnixTest, SteadyTimers) {
|
cannam@147
|
542 captureSignals();
|
cannam@147
|
543 UnixEventPort port;
|
cannam@147
|
544 EventLoop loop(port);
|
cannam@147
|
545 WaitScope waitScope(loop);
|
cannam@147
|
546
|
cannam@147
|
547 auto& timer = port.getTimer();
|
cannam@147
|
548
|
cannam@147
|
549 auto start = timer.now();
|
cannam@147
|
550 kj::Vector<TimePoint> expected;
|
cannam@147
|
551 kj::Vector<TimePoint> actual;
|
cannam@147
|
552
|
cannam@147
|
553 auto addTimer = [&](Duration delay) {
|
cannam@147
|
554 expected.add(max(start + delay, start));
|
cannam@147
|
555 timer.atTime(start + delay).then([&]() {
|
cannam@147
|
556 actual.add(timer.now());
|
cannam@147
|
557 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
|
cannam@147
|
558 };
|
cannam@147
|
559
|
cannam@147
|
560 addTimer(30 * MILLISECONDS);
|
cannam@147
|
561 addTimer(40 * MILLISECONDS);
|
cannam@147
|
562 addTimer(20350 * MICROSECONDS);
|
cannam@147
|
563 addTimer(30 * MILLISECONDS);
|
cannam@147
|
564 addTimer(-10 * MILLISECONDS);
|
cannam@147
|
565
|
cannam@147
|
566 std::sort(expected.begin(), expected.end());
|
cannam@147
|
567 timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
|
cannam@147
|
568
|
cannam@147
|
569 ASSERT_EQ(expected.size(), actual.size());
|
cannam@147
|
570 for (int i = 0; i < expected.size(); ++i) {
|
cannam@147
|
571 KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
|
cannam@147
|
572 i, ((expected[i] - actual[i]) / NANOSECONDS));
|
cannam@147
|
573 }
|
cannam@147
|
574 }
|
cannam@147
|
575
|
cannam@147
|
576 TEST(AsyncUnixTest, Wake) {
|
cannam@147
|
577 captureSignals();
|
cannam@147
|
578 UnixEventPort port;
|
cannam@147
|
579 EventLoop loop(port);
|
cannam@147
|
580 WaitScope waitScope(loop);
|
cannam@147
|
581
|
cannam@147
|
582 EXPECT_FALSE(port.poll());
|
cannam@147
|
583 port.wake();
|
cannam@147
|
584 EXPECT_TRUE(port.poll());
|
cannam@147
|
585 EXPECT_FALSE(port.poll());
|
cannam@147
|
586
|
cannam@147
|
587 port.wake();
|
cannam@147
|
588 EXPECT_TRUE(port.wait());
|
cannam@147
|
589
|
cannam@147
|
590 {
|
cannam@147
|
591 auto promise = port.getTimer().atTime(port.getTimer().now());
|
cannam@147
|
592 EXPECT_FALSE(port.wait());
|
cannam@147
|
593 }
|
cannam@147
|
594
|
cannam@147
|
595 bool woken = false;
|
cannam@147
|
596 Thread thread([&]() {
|
cannam@147
|
597 delay();
|
cannam@147
|
598 woken = true;
|
cannam@147
|
599 port.wake();
|
cannam@147
|
600 });
|
cannam@147
|
601
|
cannam@147
|
602 EXPECT_TRUE(port.wait());
|
cannam@147
|
603 }
|
cannam@147
|
604
|
cannam@147
|
605 } // namespace
|
cannam@147
|
606 } // namespace kj
|
cannam@147
|
607
|
cannam@147
|
608 #endif // !_WIN32
|