Mercurial > hg > sv-dependency-builds
diff osx/include/kj/async-io.h @ 147:45360b968bf4
Cap'n Proto v0.6 + build for OSX
author | Chris Cannam <cannam@all-day-breakfast.com> |
---|---|
date | Mon, 22 May 2017 10:01:37 +0100 |
parents | 41e769c91eca |
children |
line wrap: on
line diff
--- a/osx/include/kj/async-io.h Mon Mar 06 13:29:58 2017 +0000 +++ b/osx/include/kj/async-io.h Mon May 22 10:01:37 2017 +0100 @@ -1,502 +1,560 @@ -// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors -// Licensed under the MIT License: -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -#ifndef KJ_ASYNC_IO_H_ -#define KJ_ASYNC_IO_H_ - -#if defined(__GNUC__) && !KJ_HEADER_WARNINGS -#pragma GCC system_header -#endif - -#include "async.h" -#include "function.h" -#include "thread.h" -#include "time.h" - -struct sockaddr; - -namespace kj { - -class UnixEventPort; -class NetworkAddress; - -// ======================================================================================= -// Streaming I/O - -class AsyncInputStream { - // Asynchronous equivalent of InputStream (from io.h). - -public: - virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) = 0; - virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; - - Promise<void> read(void* buffer, size_t bytes); -}; - -class AsyncOutputStream { - // Asynchronous equivalent of OutputStream (from io.h). - -public: - virtual Promise<void> write(const void* buffer, size_t size) = 0; - virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0; -}; - -class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { - // A combination input and output stream. - -public: - virtual void shutdownWrite() = 0; - // Cleanly shut down just the write end of the stream, while keeping the read end open. - - virtual void abortRead() {} - // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only - // be called when an error has occurred. - - virtual void getsockopt(int level, int option, void* value, uint* length); - virtual void setsockopt(int level, int option, const void* value, uint length); - // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception - // if the stream is not a socket or the option is not appropriate for the socket type. The - // default implementations always throw "unimplemented". - - virtual void getsockname(struct sockaddr* addr, uint* length); - virtual void getpeername(struct sockaddr* addr, uint* length); - // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" - // exception if the stream is not a socket. The default implementations always throw - // "unimplemented". - // - // Note that we don't provide methods that return NetworkAddress because it usually wouldn't - // be useful. You can't connect() to or listen() on these addresses, obviously, because they are - // ephemeral addresses for a single connection. -}; - -struct OneWayPipe { - // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) - - Own<AsyncInputStream> in; - Own<AsyncOutputStream> out; -}; - -struct TwoWayPipe { - // A data pipe that supports sending in both directions. Each end's output sends data to the - // other end's input. (Typically backed by socketpair() system call.) - - Own<AsyncIoStream> ends[2]; -}; - -class ConnectionReceiver { - // Represents a server socket listening on a port. - -public: - virtual Promise<Own<AsyncIoStream>> accept() = 0; - // Accept the next incoming connection. - - virtual uint getPort() = 0; - // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't - // specify a port when constructing the NetworkAddress -- one will have been assigned - // automatically. - - virtual void getsockopt(int level, int option, void* value, uint* length); - virtual void setsockopt(int level, int option, const void* value, uint length); - // Same as the methods of AsyncIoStream. -}; - -// ======================================================================================= -// Datagram I/O - -class AncillaryMessage { - // Represents an ancillary message (aka control message) received using the recvmsg() system - // call (or equivalent). Most apps will not use this. - -public: - inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data); - AncillaryMessage() = default; - - inline int getLevel() const; - // Originating protocol / socket level. - - inline int getType() const; - // Protocol-specific message type. - - template <typename T> - inline Maybe<const T&> as(); - // Interpret the ancillary message as the given struct type. Most ancillary messages are some - // sort of struct, so this is a convenient way to access it. Returns nullptr if the message - // is smaller than the struct -- this can happen if the message was truncated due to - // insufficient ancillary buffer space. - - template <typename T> - inline ArrayPtr<const T> asArray(); - // Interpret the ancillary message as an array of items. If the message size does not evenly - // divide into elements of type T, the remainder is discarded -- this can happen if the message - // was truncated due to insufficient ancillary buffer space. - -private: - int level; - int type; - ArrayPtr<const byte> data; - // Message data. In most cases you should use `as()` or `asArray()`. -}; - -class DatagramReceiver { - // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's - // capacity in advance; if a received packet is larger than the capacity, it will be truncated. - -public: - virtual Promise<void> receive() = 0; - // Receive a new message, overwriting this object's content. - // - // receive() may reuse the same buffers for content and ancillary data with each call. - - template <typename T> - struct MaybeTruncated { - T value; - - bool isTruncated; - // True if the Receiver's capacity was insufficient to receive the value and therefore the - // value is truncated. - }; - - virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0; - // Get the content of the datagram. - - virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0; - // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr - // struct. Most apps don't need this. - // - // If the returned value is truncated, then the last message in the array may itself be - // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will - // return fewer elements than expected. Truncation can also mean that additional messages were - // available but discarded. - - virtual NetworkAddress& getSource() = 0; - // Get the datagram sender's address. - - struct Capacity { - size_t content = 8192; - // How much space to allocate for the datagram content. If a datagram is received that is - // larger than this, it will be truncated, with no way to recover the tail. - - size_t ancillary = 0; - // How much space to allocate for ancillary messages. As with content, if the ancillary data - // is larger than this, it will be truncated. - }; -}; - -class DatagramPort { -public: - virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0; - virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces, - NetworkAddress& destination) = 0; - - virtual Own<DatagramReceiver> makeReceiver( - DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; - // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much - // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. - - virtual uint getPort() = 0; - // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't - // specify a port when constructing the NetworkAddress -- one will have been assigned - // automatically. - - virtual void getsockopt(int level, int option, void* value, uint* length); - virtual void setsockopt(int level, int option, const void* value, uint length); - // Same as the methods of AsyncIoStream. -}; - -// ======================================================================================= -// Networks - -class NetworkAddress { - // Represents a remote address to which the application can connect. - -public: - virtual Promise<Own<AsyncIoStream>> connect() = 0; - // Make a new connection to this address. - // - // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. - - virtual Own<ConnectionReceiver> listen() = 0; - // Listen for incoming connections on this address. - // - // The address must be local. - - virtual Own<DatagramPort> bindDatagramPort(); - // Open this address as a datagram (e.g. UDP) port. - // - // The address must be local. - - virtual Own<NetworkAddress> clone() = 0; - // Returns an equivalent copy of this NetworkAddress. - - virtual String toString() = 0; - // Produce a human-readable string which hopefully can be passed to Network::parseAddress() - // to reproduce this address, although whether or not that works of course depends on the Network - // implementation. This should be called only to display the address to human users, who will - // hopefully know what they are able to do with it. -}; - -class Network { - // Factory for NetworkAddress instances, representing the network services offered by the - // operating system. - // - // This interface typically represents broad authority, and well-designed code should limit its - // use to high-level startup code and user interaction. Low-level APIs should accept - // NetworkAddress instances directly and work from there, if at all possible. - -public: - virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0; - // Construct a network address from a user-provided string. The format of the address - // strings is not specified at the API level, and application code should make no assumptions - // about them. These strings should always be provided by humans, and said humans will know - // what format to use in their particular context. - // - // `portHint`, if provided, specifies the "standard" IP port number for the application-level - // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a - // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is - // omitted, then the returned address will only support listen() and bindDatagramPort() - // (not connect()), and an unused port will be chosen each time one of those methods is called. - - virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0; - // Construct a network address from a legacy struct sockaddr. -}; - -// ======================================================================================= -// I/O Provider - -class AsyncIoProvider { - // Class which constructs asynchronous wrappers around the operating system's I/O facilities. - // - // Generally, the implementation of this interface must integrate closely with a particular - // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide - // an AsyncIoProvider. - -public: - virtual OneWayPipe newOneWayPipe() = 0; - // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with - // the pipe(2) system call). - - virtual TwoWayPipe newTwoWayPipe() = 0; - // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with - // socketpair(2) system call). Data written to one end can be read from the other. - - virtual Network& getNetwork() = 0; - // Creates a new `Network` instance representing the networks exposed by the operating system. - // - // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If - // you call this from low-level code, then you are preventing higher-level code from injecting an - // alternative implementation. Instead, if your code needs to use network functionality, it - // should ask for a `Network` as a constructor or method parameter, so that higher-level code can - // chose what implementation to use. The system network is essentially a singleton. See: - // http://www.object-oriented-security.org/lets-argue/singletons - // - // Code that uses the system network should not make any assumptions about what kinds of - // addresses it will parse, as this could differ across platforms. String addresses should come - // strictly from the user, who will know how to write them correctly for their system. - // - // With that said, KJ currently supports the following string address formats: - // - IPv4: "1.2.3.4", "1.2.3.4:80" - // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" - // - Local IP wildcard (covers both v4 and v6): "*", "*:80" - // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" - // - Unix domain: "unix:/path/to/socket" - - struct PipeThread { - // A combination of a thread and a two-way pipe that communicates with that thread. - // - // The fields are intentionally ordered so that the pipe will be destroyed (and therefore - // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread - // arranges to exit when it detects disconnect, destruction should be clean. - - Own<Thread> thread; - Own<AsyncIoStream> pipe; - }; - - virtual PipeThread newPipeThread( - Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0; - // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate - // with it. One end of the pipe is passed to the thread's start function and the other end of - // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will - // already have an active `EventLoop` when `startFunc` is called. - // - // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too - // much at once but I'm not sure how to cleanly break it down. - - virtual Timer& getTimer() = 0; - // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- - // it only updates when the event loop polls for system events. This means that calling `now()` - // on this timer does not require a system call. - // - // This timer is not affected by changes to the system date. It is unspecified whether the timer - // continues to count while the system is suspended. -}; - -class LowLevelAsyncIoProvider { - // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on - // different operating systems. You should prefer to use `AsyncIoProvider` over this interface - // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. - // - // On Unix, this interface can be used to import native file descriptors into the async framework. - // Different implementations of this interface might work on top of different event handling - // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. - // - // On Windows, this interface can be used to import native HANDLEs into the async framework. - // Different implementations of this interface might work on top of different event handling - // primitives, such as I/O completion ports vs. completion routines. - // - // TODO(port): Actually implement Windows support. - -public: - // --------------------------------------------------------------------------- - // Unix-specific stuff - - enum Flags { - // Flags controlling how to wrap a file descriptor. - - TAKE_OWNERSHIP = 1 << 0, - // The returned object should own the file descriptor, automatically closing it when destroyed. - // The close-on-exec flag will be set on the descriptor if it is not already. - // - // If this flag is not used, then the file descriptor is not automatically closed and the - // close-on-exec flag is not modified. - - ALREADY_CLOEXEC = 1 << 1, - // Indicates that the close-on-exec flag is known already to be set, so need not be set again. - // Only relevant when combined with TAKE_OWNERSHIP. - // - // On Linux, all system calls which yield new file descriptors have flags or variants which - // set the close-on-exec flag immediately. Unfortunately, other OS's do not. - - ALREADY_NONBLOCK = 1 << 2 - // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag - // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode - // automatically. - // - // On Linux, all system calls which yield new file descriptors have flags or variants which - // enable non-blocking mode immediately. Unfortunately, other OS's do not. - }; - - virtual Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) = 0; - // Create an AsyncInputStream wrapping a file descriptor. - // - // `flags` is a bitwise-OR of the values of the `Flags` enum. - - virtual Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) = 0; - // Create an AsyncOutputStream wrapping a file descriptor. - // - // `flags` is a bitwise-OR of the values of the `Flags` enum. - - virtual Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) = 0; - // Create an AsyncIoStream wrapping a socket file descriptor. - // - // `flags` is a bitwise-OR of the values of the `Flags` enum. - - virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) = 0; - // Create an AsyncIoStream wrapping a socket that is in the process of connecting. The returned - // promise should not resolve until connection has completed -- traditionally indicated by the - // descriptor becoming writable. - // - // `flags` is a bitwise-OR of the values of the `Flags` enum. - - virtual Own<ConnectionReceiver> wrapListenSocketFd(int fd, uint flags = 0) = 0; - // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already - // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. - // - // `flags` is a bitwise-OR of the values of the `Flags` enum. - - virtual Own<DatagramPort> wrapDatagramSocketFd(int fd, uint flags = 0); - - virtual Timer& getTimer() = 0; - // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- - // it only updates when the event loop polls for system events. This means that calling `now()` - // on this timer does not require a system call. - // - // This timer is not affected by changes to the system date. It is unspecified whether the timer - // continues to count while the system is suspended. -}; - -Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); -// Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. - -struct AsyncIoContext { - Own<LowLevelAsyncIoProvider> lowLevelProvider; - Own<AsyncIoProvider> provider; - WaitScope& waitScope; - - UnixEventPort& unixEventPort; - // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This - // field will go away at some point when we have a chance to improve these interfaces. -}; - -AsyncIoContext setupAsyncIo(); -// Convenience method which sets up the current thread with everything it needs to do async I/O. -// The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for -// doing I/O on the host system, so everything is ready for the thread to start making async calls -// and waiting on promises. -// -// You would typically call this in your main() loop or in the start function of a thread. -// Example: -// -// int main() { -// auto ioContext = kj::setupAsyncIo(); -// -// // Now we can call an async function. -// Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com"); -// -// // And we can wait for the promise to complete. Note that you can only use `wait()` -// // from the top level, not from inside a promise callback. -// String text = textPromise.wait(ioContext.waitScope); -// print(text); -// return 0; -// } -// -// WARNING: An AsyncIoContext can only be used in the thread and process that created it. In -// particular, note that after a fork(), an AsyncIoContext created in the parent process will -// not work correctly in the child, even if the parent ceases to use its copy. In particular -// note that this means that server processes which daemonize themselves at startup must wait -// until after daemonization to create an AsyncIoContext. - -// ======================================================================================= -// inline implementation details - -inline AncillaryMessage::AncillaryMessage( - int level, int type, ArrayPtr<const byte> data) - : level(level), type(type), data(data) {} - -inline int AncillaryMessage::getLevel() const { return level; } -inline int AncillaryMessage::getType() const { return type; } - -template <typename T> -inline Maybe<const T&> AncillaryMessage::as() { - if (data.size() >= sizeof(T)) { - return *reinterpret_cast<const T*>(data.begin()); - } else { - return nullptr; - } -} - -template <typename T> -inline ArrayPtr<const T> AncillaryMessage::asArray() { - return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T)); -} - -} // namespace kj - -#endif // KJ_ASYNC_IO_H_ +// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors +// Licensed under the MIT License: +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +#ifndef KJ_ASYNC_IO_H_ +#define KJ_ASYNC_IO_H_ + +#if defined(__GNUC__) && !KJ_HEADER_WARNINGS +#pragma GCC system_header +#endif + +#include "async.h" +#include "function.h" +#include "thread.h" +#include "time.h" + +struct sockaddr; + +namespace kj { + +#if _WIN32 +class Win32EventPort; +#else +class UnixEventPort; +#endif + +class NetworkAddress; +class AsyncOutputStream; + +// ======================================================================================= +// Streaming I/O + +class AsyncInputStream { + // Asynchronous equivalent of InputStream (from io.h). + +public: + virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes); + virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; + + Promise<void> read(void* buffer, size_t bytes); + + virtual Maybe<uint64_t> tryGetLength(); + // Get the remaining number of bytes that will be produced by this stream, if known. + // + // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the + // HTTP implementation may need to fall back to Transfer-Encoding: chunked. + // + // The default implementation always returns null. + + virtual Promise<uint64_t> pumpTo( + AsyncOutputStream& output, uint64_t amount = kj::maxValue); + // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the + // total bytes actually pumped (which is only less than `amount` if EOF was reached). + // + // Override this if your stream type knows how to pump itself to certain kinds of output + // streams more efficiently than via the naive approach. You can use + // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match, + // delegate to the default implementation. + // + // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it + // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop. + + Promise<Array<byte>> readAllBytes(); + Promise<String> readAllText(); + // Read until EOF and return as one big byte array or string. +}; + +class AsyncOutputStream { + // Asynchronous equivalent of OutputStream (from io.h). + +public: + virtual Promise<void> write(const void* buffer, size_t size) = 0; + virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0; + + virtual Maybe<Promise<uint64_t>> tryPumpFrom( + AsyncInputStream& input, uint64_t amount = kj::maxValue); + // Implements double-dispatch for AsyncInputStream::pumpTo(). + // + // This method should only be called from within an implementation of pumpTo(). + // + // This method examines the type of `input` to find optimized ways to pump data from it to this + // output stream. If it finds one, it performs the pump. Otherwise, it returns null. + // + // The default implementation always returns null. +}; + +class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { + // A combination input and output stream. + +public: + virtual void shutdownWrite() = 0; + // Cleanly shut down just the write end of the stream, while keeping the read end open. + + virtual void abortRead() {} + // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only + // be called when an error has occurred. + + virtual void getsockopt(int level, int option, void* value, uint* length); + virtual void setsockopt(int level, int option, const void* value, uint length); + // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception + // if the stream is not a socket or the option is not appropriate for the socket type. The + // default implementations always throw "unimplemented". + + virtual void getsockname(struct sockaddr* addr, uint* length); + virtual void getpeername(struct sockaddr* addr, uint* length); + // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" + // exception if the stream is not a socket. The default implementations always throw + // "unimplemented". + // + // Note that we don't provide methods that return NetworkAddress because it usually wouldn't + // be useful. You can't connect() to or listen() on these addresses, obviously, because they are + // ephemeral addresses for a single connection. +}; + +struct OneWayPipe { + // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) + + Own<AsyncInputStream> in; + Own<AsyncOutputStream> out; +}; + +struct TwoWayPipe { + // A data pipe that supports sending in both directions. Each end's output sends data to the + // other end's input. (Typically backed by socketpair() system call.) + + Own<AsyncIoStream> ends[2]; +}; + +class ConnectionReceiver { + // Represents a server socket listening on a port. + +public: + virtual Promise<Own<AsyncIoStream>> accept() = 0; + // Accept the next incoming connection. + + virtual uint getPort() = 0; + // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't + // specify a port when constructing the NetworkAddress -- one will have been assigned + // automatically. + + virtual void getsockopt(int level, int option, void* value, uint* length); + virtual void setsockopt(int level, int option, const void* value, uint length); + // Same as the methods of AsyncIoStream. +}; + +// ======================================================================================= +// Datagram I/O + +class AncillaryMessage { + // Represents an ancillary message (aka control message) received using the recvmsg() system + // call (or equivalent). Most apps will not use this. + +public: + inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data); + AncillaryMessage() = default; + + inline int getLevel() const; + // Originating protocol / socket level. + + inline int getType() const; + // Protocol-specific message type. + + template <typename T> + inline Maybe<const T&> as(); + // Interpret the ancillary message as the given struct type. Most ancillary messages are some + // sort of struct, so this is a convenient way to access it. Returns nullptr if the message + // is smaller than the struct -- this can happen if the message was truncated due to + // insufficient ancillary buffer space. + + template <typename T> + inline ArrayPtr<const T> asArray(); + // Interpret the ancillary message as an array of items. If the message size does not evenly + // divide into elements of type T, the remainder is discarded -- this can happen if the message + // was truncated due to insufficient ancillary buffer space. + +private: + int level; + int type; + ArrayPtr<const byte> data; + // Message data. In most cases you should use `as()` or `asArray()`. +}; + +class DatagramReceiver { + // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's + // capacity in advance; if a received packet is larger than the capacity, it will be truncated. + +public: + virtual Promise<void> receive() = 0; + // Receive a new message, overwriting this object's content. + // + // receive() may reuse the same buffers for content and ancillary data with each call. + + template <typename T> + struct MaybeTruncated { + T value; + + bool isTruncated; + // True if the Receiver's capacity was insufficient to receive the value and therefore the + // value is truncated. + }; + + virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0; + // Get the content of the datagram. + + virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0; + // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr + // struct. Most apps don't need this. + // + // If the returned value is truncated, then the last message in the array may itself be + // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will + // return fewer elements than expected. Truncation can also mean that additional messages were + // available but discarded. + + virtual NetworkAddress& getSource() = 0; + // Get the datagram sender's address. + + struct Capacity { + size_t content = 8192; + // How much space to allocate for the datagram content. If a datagram is received that is + // larger than this, it will be truncated, with no way to recover the tail. + + size_t ancillary = 0; + // How much space to allocate for ancillary messages. As with content, if the ancillary data + // is larger than this, it will be truncated. + }; +}; + +class DatagramPort { +public: + virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0; + virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces, + NetworkAddress& destination) = 0; + + virtual Own<DatagramReceiver> makeReceiver( + DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; + // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much + // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. + + virtual uint getPort() = 0; + // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't + // specify a port when constructing the NetworkAddress -- one will have been assigned + // automatically. + + virtual void getsockopt(int level, int option, void* value, uint* length); + virtual void setsockopt(int level, int option, const void* value, uint length); + // Same as the methods of AsyncIoStream. +}; + +// ======================================================================================= +// Networks + +class NetworkAddress { + // Represents a remote address to which the application can connect. + +public: + virtual Promise<Own<AsyncIoStream>> connect() = 0; + // Make a new connection to this address. + // + // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. + + virtual Own<ConnectionReceiver> listen() = 0; + // Listen for incoming connections on this address. + // + // The address must be local. + + virtual Own<DatagramPort> bindDatagramPort(); + // Open this address as a datagram (e.g. UDP) port. + // + // The address must be local. + + virtual Own<NetworkAddress> clone() = 0; + // Returns an equivalent copy of this NetworkAddress. + + virtual String toString() = 0; + // Produce a human-readable string which hopefully can be passed to Network::parseAddress() + // to reproduce this address, although whether or not that works of course depends on the Network + // implementation. This should be called only to display the address to human users, who will + // hopefully know what they are able to do with it. +}; + +class Network { + // Factory for NetworkAddress instances, representing the network services offered by the + // operating system. + // + // This interface typically represents broad authority, and well-designed code should limit its + // use to high-level startup code and user interaction. Low-level APIs should accept + // NetworkAddress instances directly and work from there, if at all possible. + +public: + virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0; + // Construct a network address from a user-provided string. The format of the address + // strings is not specified at the API level, and application code should make no assumptions + // about them. These strings should always be provided by humans, and said humans will know + // what format to use in their particular context. + // + // `portHint`, if provided, specifies the "standard" IP port number for the application-level + // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a + // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is + // omitted, then the returned address will only support listen() and bindDatagramPort() + // (not connect()), and an unused port will be chosen each time one of those methods is called. + + virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0; + // Construct a network address from a legacy struct sockaddr. +}; + +// ======================================================================================= +// I/O Provider + +class AsyncIoProvider { + // Class which constructs asynchronous wrappers around the operating system's I/O facilities. + // + // Generally, the implementation of this interface must integrate closely with a particular + // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide + // an AsyncIoProvider. + +public: + virtual OneWayPipe newOneWayPipe() = 0; + // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with + // the pipe(2) system call). + + virtual TwoWayPipe newTwoWayPipe() = 0; + // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with + // socketpair(2) system call). Data written to one end can be read from the other. + + virtual Network& getNetwork() = 0; + // Creates a new `Network` instance representing the networks exposed by the operating system. + // + // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If + // you call this from low-level code, then you are preventing higher-level code from injecting an + // alternative implementation. Instead, if your code needs to use network functionality, it + // should ask for a `Network` as a constructor or method parameter, so that higher-level code can + // chose what implementation to use. The system network is essentially a singleton. See: + // http://www.object-oriented-security.org/lets-argue/singletons + // + // Code that uses the system network should not make any assumptions about what kinds of + // addresses it will parse, as this could differ across platforms. String addresses should come + // strictly from the user, who will know how to write them correctly for their system. + // + // With that said, KJ currently supports the following string address formats: + // - IPv4: "1.2.3.4", "1.2.3.4:80" + // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" + // - Local IP wildcard (covers both v4 and v6): "*", "*:80" + // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" + // - Unix domain: "unix:/path/to/socket" + + struct PipeThread { + // A combination of a thread and a two-way pipe that communicates with that thread. + // + // The fields are intentionally ordered so that the pipe will be destroyed (and therefore + // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread + // arranges to exit when it detects disconnect, destruction should be clean. + + Own<Thread> thread; + Own<AsyncIoStream> pipe; + }; + + virtual PipeThread newPipeThread( + Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0; + // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate + // with it. One end of the pipe is passed to the thread's start function and the other end of + // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will + // already have an active `EventLoop` when `startFunc` is called. + // + // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too + // much at once but I'm not sure how to cleanly break it down. + + virtual Timer& getTimer() = 0; + // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- + // it only updates when the event loop polls for system events. This means that calling `now()` + // on this timer does not require a system call. + // + // This timer is not affected by changes to the system date. It is unspecified whether the timer + // continues to count while the system is suspended. +}; + +class LowLevelAsyncIoProvider { + // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on + // different operating systems. You should prefer to use `AsyncIoProvider` over this interface + // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. + // + // On Unix, this interface can be used to import native file descriptors into the async framework. + // Different implementations of this interface might work on top of different event handling + // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. + // + // On Windows, this interface can be used to import native HANDLEs into the async framework. + // Different implementations of this interface might work on top of different event handling + // primitives, such as I/O completion ports vs. completion routines. + // + // TODO(port): Actually implement Windows support. + +public: + // --------------------------------------------------------------------------- + // Unix-specific stuff + + enum Flags { + // Flags controlling how to wrap a file descriptor. + + TAKE_OWNERSHIP = 1 << 0, + // The returned object should own the file descriptor, automatically closing it when destroyed. + // The close-on-exec flag will be set on the descriptor if it is not already. + // + // If this flag is not used, then the file descriptor is not automatically closed and the + // close-on-exec flag is not modified. + +#if !_WIN32 + ALREADY_CLOEXEC = 1 << 1, + // Indicates that the close-on-exec flag is known already to be set, so need not be set again. + // Only relevant when combined with TAKE_OWNERSHIP. + // + // On Linux, all system calls which yield new file descriptors have flags or variants which + // set the close-on-exec flag immediately. Unfortunately, other OS's do not. + + ALREADY_NONBLOCK = 1 << 2 + // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag + // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode + // automatically. + // + // On Linux, all system calls which yield new file descriptors have flags or variants which + // enable non-blocking mode immediately. Unfortunately, other OS's do not. +#endif + }; + +#if _WIN32 + typedef uintptr_t Fd; + // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the + // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify + // explicitly). +#else + typedef int Fd; + // On Unix, any arbitrary file descriptor is supported. +#endif + + virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0; + // Create an AsyncInputStream wrapping a file descriptor. + // + // `flags` is a bitwise-OR of the values of the `Flags` enum. + + virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0; + // Create an AsyncOutputStream wrapping a file descriptor. + // + // `flags` is a bitwise-OR of the values of the `Flags` enum. + + virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0; + // Create an AsyncIoStream wrapping a socket file descriptor. + // + // `flags` is a bitwise-OR of the values of the `Flags` enum. + + virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( + Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0; + // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address. + // The returned promise does not resolve until connection has completed. + // + // `flags` is a bitwise-OR of the values of the `Flags` enum. + + virtual Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) = 0; + // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already + // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. + // + // `flags` is a bitwise-OR of the values of the `Flags` enum. + + virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0); + + virtual Timer& getTimer() = 0; + // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- + // it only updates when the event loop polls for system events. This means that calling `now()` + // on this timer does not require a system call. + // + // This timer is not affected by changes to the system date. It is unspecified whether the timer + // continues to count while the system is suspended. +}; + +Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); +// Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. + +struct AsyncIoContext { + Own<LowLevelAsyncIoProvider> lowLevelProvider; + Own<AsyncIoProvider> provider; + WaitScope& waitScope; + +#if _WIN32 + Win32EventPort& win32EventPort; +#else + UnixEventPort& unixEventPort; + // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This + // field will go away at some point when we have a chance to improve these interfaces. +#endif +}; + +AsyncIoContext setupAsyncIo(); +// Convenience method which sets up the current thread with everything it needs to do async I/O. +// The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for +// doing I/O on the host system, so everything is ready for the thread to start making async calls +// and waiting on promises. +// +// You would typically call this in your main() loop or in the start function of a thread. +// Example: +// +// int main() { +// auto ioContext = kj::setupAsyncIo(); +// +// // Now we can call an async function. +// Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com"); +// +// // And we can wait for the promise to complete. Note that you can only use `wait()` +// // from the top level, not from inside a promise callback. +// String text = textPromise.wait(ioContext.waitScope); +// print(text); +// return 0; +// } +// +// WARNING: An AsyncIoContext can only be used in the thread and process that created it. In +// particular, note that after a fork(), an AsyncIoContext created in the parent process will +// not work correctly in the child, even if the parent ceases to use its copy. In particular +// note that this means that server processes which daemonize themselves at startup must wait +// until after daemonization to create an AsyncIoContext. + +// ======================================================================================= +// inline implementation details + +inline AncillaryMessage::AncillaryMessage( + int level, int type, ArrayPtr<const byte> data) + : level(level), type(type), data(data) {} + +inline int AncillaryMessage::getLevel() const { return level; } +inline int AncillaryMessage::getType() const { return type; } + +template <typename T> +inline Maybe<const T&> AncillaryMessage::as() { + if (data.size() >= sizeof(T)) { + return *reinterpret_cast<const T*>(data.begin()); + } else { + return nullptr; + } +} + +template <typename T> +inline ArrayPtr<const T> AncillaryMessage::asArray() { + return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T)); +} + +} // namespace kj + +#endif // KJ_ASYNC_IO_H_