comparison osx/include/kj/async-io.h @ 147:45360b968bf4

Cap'n Proto v0.6 + build for OSX
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 22 May 2017 10:01:37 +0100
parents 41e769c91eca
children
comparison
equal deleted inserted replaced
146:206f0eb279b8 147:45360b968bf4
33 33
34 struct sockaddr; 34 struct sockaddr;
35 35
36 namespace kj { 36 namespace kj {
37 37
38 #if _WIN32
39 class Win32EventPort;
40 #else
38 class UnixEventPort; 41 class UnixEventPort;
42 #endif
43
39 class NetworkAddress; 44 class NetworkAddress;
45 class AsyncOutputStream;
40 46
41 // ======================================================================================= 47 // =======================================================================================
42 // Streaming I/O 48 // Streaming I/O
43 49
44 class AsyncInputStream { 50 class AsyncInputStream {
45 // Asynchronous equivalent of InputStream (from io.h). 51 // Asynchronous equivalent of InputStream (from io.h).
46 52
47 public: 53 public:
48 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) = 0; 54 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes);
49 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; 55 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
50 56
51 Promise<void> read(void* buffer, size_t bytes); 57 Promise<void> read(void* buffer, size_t bytes);
58
59 virtual Maybe<uint64_t> tryGetLength();
60 // Get the remaining number of bytes that will be produced by this stream, if known.
61 //
62 // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
63 // HTTP implementation may need to fall back to Transfer-Encoding: chunked.
64 //
65 // The default implementation always returns null.
66
67 virtual Promise<uint64_t> pumpTo(
68 AsyncOutputStream& output, uint64_t amount = kj::maxValue);
69 // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
70 // total bytes actually pumped (which is only less than `amount` if EOF was reached).
71 //
72 // Override this if your stream type knows how to pump itself to certain kinds of output
73 // streams more efficiently than via the naive approach. You can use
74 // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
75 // delegate to the default implementation.
76 //
77 // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
78 // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.
79
80 Promise<Array<byte>> readAllBytes();
81 Promise<String> readAllText();
82 // Read until EOF and return as one big byte array or string.
52 }; 83 };
53 84
54 class AsyncOutputStream { 85 class AsyncOutputStream {
55 // Asynchronous equivalent of OutputStream (from io.h). 86 // Asynchronous equivalent of OutputStream (from io.h).
56 87
57 public: 88 public:
58 virtual Promise<void> write(const void* buffer, size_t size) = 0; 89 virtual Promise<void> write(const void* buffer, size_t size) = 0;
59 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0; 90 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0;
91
92 virtual Maybe<Promise<uint64_t>> tryPumpFrom(
93 AsyncInputStream& input, uint64_t amount = kj::maxValue);
94 // Implements double-dispatch for AsyncInputStream::pumpTo().
95 //
96 // This method should only be called from within an implementation of pumpTo().
97 //
98 // This method examines the type of `input` to find optimized ways to pump data from it to this
99 // output stream. If it finds one, it performs the pump. Otherwise, it returns null.
100 //
101 // The default implementation always returns null.
60 }; 102 };
61 103
62 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { 104 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
63 // A combination input and output stream. 105 // A combination input and output stream.
64 106
375 // The close-on-exec flag will be set on the descriptor if it is not already. 417 // The close-on-exec flag will be set on the descriptor if it is not already.
376 // 418 //
377 // If this flag is not used, then the file descriptor is not automatically closed and the 419 // If this flag is not used, then the file descriptor is not automatically closed and the
378 // close-on-exec flag is not modified. 420 // close-on-exec flag is not modified.
379 421
422 #if !_WIN32
380 ALREADY_CLOEXEC = 1 << 1, 423 ALREADY_CLOEXEC = 1 << 1,
381 // Indicates that the close-on-exec flag is known already to be set, so need not be set again. 424 // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
382 // Only relevant when combined with TAKE_OWNERSHIP. 425 // Only relevant when combined with TAKE_OWNERSHIP.
383 // 426 //
384 // On Linux, all system calls which yield new file descriptors have flags or variants which 427 // On Linux, all system calls which yield new file descriptors have flags or variants which
389 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode 432 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode
390 // automatically. 433 // automatically.
391 // 434 //
392 // On Linux, all system calls which yield new file descriptors have flags or variants which 435 // On Linux, all system calls which yield new file descriptors have flags or variants which
393 // enable non-blocking mode immediately. Unfortunately, other OS's do not. 436 // enable non-blocking mode immediately. Unfortunately, other OS's do not.
437 #endif
394 }; 438 };
395 439
396 virtual Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) = 0; 440 #if _WIN32
441 typedef uintptr_t Fd;
442 // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the
443 // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify
444 // explicitly).
445 #else
446 typedef int Fd;
447 // On Unix, any arbitrary file descriptor is supported.
448 #endif
449
450 virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0;
397 // Create an AsyncInputStream wrapping a file descriptor. 451 // Create an AsyncInputStream wrapping a file descriptor.
398 // 452 //
399 // `flags` is a bitwise-OR of the values of the `Flags` enum. 453 // `flags` is a bitwise-OR of the values of the `Flags` enum.
400 454
401 virtual Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) = 0; 455 virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0;
402 // Create an AsyncOutputStream wrapping a file descriptor. 456 // Create an AsyncOutputStream wrapping a file descriptor.
403 // 457 //
404 // `flags` is a bitwise-OR of the values of the `Flags` enum. 458 // `flags` is a bitwise-OR of the values of the `Flags` enum.
405 459
406 virtual Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) = 0; 460 virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0;
407 // Create an AsyncIoStream wrapping a socket file descriptor. 461 // Create an AsyncIoStream wrapping a socket file descriptor.
408 // 462 //
409 // `flags` is a bitwise-OR of the values of the `Flags` enum. 463 // `flags` is a bitwise-OR of the values of the `Flags` enum.
410 464
411 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) = 0; 465 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
412 // Create an AsyncIoStream wrapping a socket that is in the process of connecting. The returned 466 Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0;
413 // promise should not resolve until connection has completed -- traditionally indicated by the 467 // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address.
414 // descriptor becoming writable. 468 // The returned promise does not resolve until connection has completed.
415 // 469 //
416 // `flags` is a bitwise-OR of the values of the `Flags` enum. 470 // `flags` is a bitwise-OR of the values of the `Flags` enum.
417 471
418 virtual Own<ConnectionReceiver> wrapListenSocketFd(int fd, uint flags = 0) = 0; 472 virtual Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) = 0;
419 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already 473 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
420 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. 474 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
421 // 475 //
422 // `flags` is a bitwise-OR of the values of the `Flags` enum. 476 // `flags` is a bitwise-OR of the values of the `Flags` enum.
423 477
424 virtual Own<DatagramPort> wrapDatagramSocketFd(int fd, uint flags = 0); 478 virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0);
425 479
426 virtual Timer& getTimer() = 0; 480 virtual Timer& getTimer() = 0;
427 // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- 481 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
428 // it only updates when the event loop polls for system events. This means that calling `now()` 482 // it only updates when the event loop polls for system events. This means that calling `now()`
429 // on this timer does not require a system call. 483 // on this timer does not require a system call.
438 struct AsyncIoContext { 492 struct AsyncIoContext {
439 Own<LowLevelAsyncIoProvider> lowLevelProvider; 493 Own<LowLevelAsyncIoProvider> lowLevelProvider;
440 Own<AsyncIoProvider> provider; 494 Own<AsyncIoProvider> provider;
441 WaitScope& waitScope; 495 WaitScope& waitScope;
442 496
497 #if _WIN32
498 Win32EventPort& win32EventPort;
499 #else
443 UnixEventPort& unixEventPort; 500 UnixEventPort& unixEventPort;
444 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This 501 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
445 // field will go away at some point when we have a chance to improve these interfaces. 502 // field will go away at some point when we have a chance to improve these interfaces.
503 #endif
446 }; 504 };
447 505
448 AsyncIoContext setupAsyncIo(); 506 AsyncIoContext setupAsyncIo();
449 // Convenience method which sets up the current thread with everything it needs to do async I/O. 507 // Convenience method which sets up the current thread with everything it needs to do async I/O.
450 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for 508 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for