cannam@147: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors cannam@147: // Licensed under the MIT License: cannam@147: // cannam@147: // Permission is hereby granted, free of charge, to any person obtaining a copy cannam@147: // of this software and associated documentation files (the "Software"), to deal cannam@147: // in the Software without restriction, including without limitation the rights cannam@147: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell cannam@147: // copies of the Software, and to permit persons to whom the Software is cannam@147: // furnished to do so, subject to the following conditions: cannam@147: // cannam@147: // The above copyright notice and this permission notice shall be included in cannam@147: // all copies or substantial portions of the Software. cannam@147: // cannam@147: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR cannam@147: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, cannam@147: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE cannam@147: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER cannam@147: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, cannam@147: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN cannam@147: // THE SOFTWARE. cannam@147: cannam@147: #ifndef KJ_ASYNC_IO_H_ cannam@147: #define KJ_ASYNC_IO_H_ cannam@147: cannam@147: #if defined(__GNUC__) && !KJ_HEADER_WARNINGS cannam@147: #pragma GCC system_header cannam@147: #endif cannam@147: cannam@147: #include "async.h" cannam@147: #include "function.h" cannam@147: #include "thread.h" cannam@147: #include "time.h" cannam@147: cannam@147: struct sockaddr; cannam@147: cannam@147: namespace kj { cannam@147: cannam@147: #if _WIN32 cannam@147: class Win32EventPort; cannam@147: #else cannam@147: class UnixEventPort; cannam@147: #endif cannam@147: cannam@147: class NetworkAddress; cannam@147: class AsyncOutputStream; cannam@147: cannam@147: // ======================================================================================= cannam@147: // Streaming I/O cannam@147: cannam@147: class AsyncInputStream { cannam@147: // Asynchronous equivalent of InputStream (from io.h). cannam@147: cannam@147: public: cannam@147: virtual Promise read(void* buffer, size_t minBytes, size_t maxBytes); cannam@147: virtual Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; cannam@147: cannam@147: Promise read(void* buffer, size_t bytes); cannam@147: cannam@147: virtual Maybe tryGetLength(); cannam@147: // Get the remaining number of bytes that will be produced by this stream, if known. cannam@147: // cannam@147: // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the cannam@147: // HTTP implementation may need to fall back to Transfer-Encoding: chunked. cannam@147: // cannam@147: // The default implementation always returns null. cannam@147: cannam@147: virtual Promise pumpTo( cannam@147: AsyncOutputStream& output, uint64_t amount = kj::maxValue); cannam@147: // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the cannam@147: // total bytes actually pumped (which is only less than `amount` if EOF was reached). cannam@147: // cannam@147: // Override this if your stream type knows how to pump itself to certain kinds of output cannam@147: // streams more efficiently than via the naive approach. You can use cannam@147: // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match, cannam@147: // delegate to the default implementation. cannam@147: // cannam@147: // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it cannam@147: // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop. cannam@147: cannam@147: Promise> readAllBytes(); cannam@147: Promise readAllText(); cannam@147: // Read until EOF and return as one big byte array or string. cannam@147: }; cannam@147: cannam@147: class AsyncOutputStream { cannam@147: // Asynchronous equivalent of OutputStream (from io.h). cannam@147: cannam@147: public: cannam@147: virtual Promise write(const void* buffer, size_t size) = 0; cannam@147: virtual Promise write(ArrayPtr> pieces) = 0; cannam@147: cannam@147: virtual Maybe> tryPumpFrom( cannam@147: AsyncInputStream& input, uint64_t amount = kj::maxValue); cannam@147: // Implements double-dispatch for AsyncInputStream::pumpTo(). cannam@147: // cannam@147: // This method should only be called from within an implementation of pumpTo(). cannam@147: // cannam@147: // This method examines the type of `input` to find optimized ways to pump data from it to this cannam@147: // output stream. If it finds one, it performs the pump. Otherwise, it returns null. cannam@147: // cannam@147: // The default implementation always returns null. cannam@147: }; cannam@147: cannam@147: class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { cannam@147: // A combination input and output stream. cannam@147: cannam@147: public: cannam@147: virtual void shutdownWrite() = 0; cannam@147: // Cleanly shut down just the write end of the stream, while keeping the read end open. cannam@147: cannam@147: virtual void abortRead() {} cannam@147: // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only cannam@147: // be called when an error has occurred. cannam@147: cannam@147: virtual void getsockopt(int level, int option, void* value, uint* length); cannam@147: virtual void setsockopt(int level, int option, const void* value, uint length); cannam@147: // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception cannam@147: // if the stream is not a socket or the option is not appropriate for the socket type. The cannam@147: // default implementations always throw "unimplemented". cannam@147: cannam@147: virtual void getsockname(struct sockaddr* addr, uint* length); cannam@147: virtual void getpeername(struct sockaddr* addr, uint* length); cannam@147: // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" cannam@147: // exception if the stream is not a socket. The default implementations always throw cannam@147: // "unimplemented". cannam@147: // cannam@147: // Note that we don't provide methods that return NetworkAddress because it usually wouldn't cannam@147: // be useful. You can't connect() to or listen() on these addresses, obviously, because they are cannam@147: // ephemeral addresses for a single connection. cannam@147: }; cannam@147: cannam@147: struct OneWayPipe { cannam@147: // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) cannam@147: cannam@147: Own in; cannam@147: Own out; cannam@147: }; cannam@147: cannam@147: struct TwoWayPipe { cannam@147: // A data pipe that supports sending in both directions. Each end's output sends data to the cannam@147: // other end's input. (Typically backed by socketpair() system call.) cannam@147: cannam@147: Own ends[2]; cannam@147: }; cannam@147: cannam@147: class ConnectionReceiver { cannam@147: // Represents a server socket listening on a port. cannam@147: cannam@147: public: cannam@147: virtual Promise> accept() = 0; cannam@147: // Accept the next incoming connection. cannam@147: cannam@147: virtual uint getPort() = 0; cannam@147: // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't cannam@147: // specify a port when constructing the NetworkAddress -- one will have been assigned cannam@147: // automatically. cannam@147: cannam@147: virtual void getsockopt(int level, int option, void* value, uint* length); cannam@147: virtual void setsockopt(int level, int option, const void* value, uint length); cannam@147: // Same as the methods of AsyncIoStream. cannam@147: }; cannam@147: cannam@147: // ======================================================================================= cannam@147: // Datagram I/O cannam@147: cannam@147: class AncillaryMessage { cannam@147: // Represents an ancillary message (aka control message) received using the recvmsg() system cannam@147: // call (or equivalent). Most apps will not use this. cannam@147: cannam@147: public: cannam@147: inline AncillaryMessage(int level, int type, ArrayPtr data); cannam@147: AncillaryMessage() = default; cannam@147: cannam@147: inline int getLevel() const; cannam@147: // Originating protocol / socket level. cannam@147: cannam@147: inline int getType() const; cannam@147: // Protocol-specific message type. cannam@147: cannam@147: template cannam@147: inline Maybe as(); cannam@147: // Interpret the ancillary message as the given struct type. Most ancillary messages are some cannam@147: // sort of struct, so this is a convenient way to access it. Returns nullptr if the message cannam@147: // is smaller than the struct -- this can happen if the message was truncated due to cannam@147: // insufficient ancillary buffer space. cannam@147: cannam@147: template cannam@147: inline ArrayPtr asArray(); cannam@147: // Interpret the ancillary message as an array of items. If the message size does not evenly cannam@147: // divide into elements of type T, the remainder is discarded -- this can happen if the message cannam@147: // was truncated due to insufficient ancillary buffer space. cannam@147: cannam@147: private: cannam@147: int level; cannam@147: int type; cannam@147: ArrayPtr data; cannam@147: // Message data. In most cases you should use `as()` or `asArray()`. cannam@147: }; cannam@147: cannam@147: class DatagramReceiver { cannam@147: // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's cannam@147: // capacity in advance; if a received packet is larger than the capacity, it will be truncated. cannam@147: cannam@147: public: cannam@147: virtual Promise receive() = 0; cannam@147: // Receive a new message, overwriting this object's content. cannam@147: // cannam@147: // receive() may reuse the same buffers for content and ancillary data with each call. cannam@147: cannam@147: template cannam@147: struct MaybeTruncated { cannam@147: T value; cannam@147: cannam@147: bool isTruncated; cannam@147: // True if the Receiver's capacity was insufficient to receive the value and therefore the cannam@147: // value is truncated. cannam@147: }; cannam@147: cannam@147: virtual MaybeTruncated> getContent() = 0; cannam@147: // Get the content of the datagram. cannam@147: cannam@147: virtual MaybeTruncated> getAncillary() = 0; cannam@147: // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr cannam@147: // struct. Most apps don't need this. cannam@147: // cannam@147: // If the returned value is truncated, then the last message in the array may itself be cannam@147: // truncated, meaning its as() method will return nullptr or its asArray() method will cannam@147: // return fewer elements than expected. Truncation can also mean that additional messages were cannam@147: // available but discarded. cannam@147: cannam@147: virtual NetworkAddress& getSource() = 0; cannam@147: // Get the datagram sender's address. cannam@147: cannam@147: struct Capacity { cannam@147: size_t content = 8192; cannam@147: // How much space to allocate for the datagram content. If a datagram is received that is cannam@147: // larger than this, it will be truncated, with no way to recover the tail. cannam@147: cannam@147: size_t ancillary = 0; cannam@147: // How much space to allocate for ancillary messages. As with content, if the ancillary data cannam@147: // is larger than this, it will be truncated. cannam@147: }; cannam@147: }; cannam@147: cannam@147: class DatagramPort { cannam@147: public: cannam@147: virtual Promise send(const void* buffer, size_t size, NetworkAddress& destination) = 0; cannam@147: virtual Promise send(ArrayPtr> pieces, cannam@147: NetworkAddress& destination) = 0; cannam@147: cannam@147: virtual Own makeReceiver( cannam@147: DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; cannam@147: // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much cannam@147: // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. cannam@147: cannam@147: virtual uint getPort() = 0; cannam@147: // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't cannam@147: // specify a port when constructing the NetworkAddress -- one will have been assigned cannam@147: // automatically. cannam@147: cannam@147: virtual void getsockopt(int level, int option, void* value, uint* length); cannam@147: virtual void setsockopt(int level, int option, const void* value, uint length); cannam@147: // Same as the methods of AsyncIoStream. cannam@147: }; cannam@147: cannam@147: // ======================================================================================= cannam@147: // Networks cannam@147: cannam@147: class NetworkAddress { cannam@147: // Represents a remote address to which the application can connect. cannam@147: cannam@147: public: cannam@147: virtual Promise> connect() = 0; cannam@147: // Make a new connection to this address. cannam@147: // cannam@147: // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. cannam@147: cannam@147: virtual Own listen() = 0; cannam@147: // Listen for incoming connections on this address. cannam@147: // cannam@147: // The address must be local. cannam@147: cannam@147: virtual Own bindDatagramPort(); cannam@147: // Open this address as a datagram (e.g. UDP) port. cannam@147: // cannam@147: // The address must be local. cannam@147: cannam@147: virtual Own clone() = 0; cannam@147: // Returns an equivalent copy of this NetworkAddress. cannam@147: cannam@147: virtual String toString() = 0; cannam@147: // Produce a human-readable string which hopefully can be passed to Network::parseAddress() cannam@147: // to reproduce this address, although whether or not that works of course depends on the Network cannam@147: // implementation. This should be called only to display the address to human users, who will cannam@147: // hopefully know what they are able to do with it. cannam@147: }; cannam@147: cannam@147: class Network { cannam@147: // Factory for NetworkAddress instances, representing the network services offered by the cannam@147: // operating system. cannam@147: // cannam@147: // This interface typically represents broad authority, and well-designed code should limit its cannam@147: // use to high-level startup code and user interaction. Low-level APIs should accept cannam@147: // NetworkAddress instances directly and work from there, if at all possible. cannam@147: cannam@147: public: cannam@147: virtual Promise> parseAddress(StringPtr addr, uint portHint = 0) = 0; cannam@147: // Construct a network address from a user-provided string. The format of the address cannam@147: // strings is not specified at the API level, and application code should make no assumptions cannam@147: // about them. These strings should always be provided by humans, and said humans will know cannam@147: // what format to use in their particular context. cannam@147: // cannam@147: // `portHint`, if provided, specifies the "standard" IP port number for the application-level cannam@147: // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a cannam@147: // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is cannam@147: // omitted, then the returned address will only support listen() and bindDatagramPort() cannam@147: // (not connect()), and an unused port will be chosen each time one of those methods is called. cannam@147: cannam@147: virtual Own getSockaddr(const void* sockaddr, uint len) = 0; cannam@147: // Construct a network address from a legacy struct sockaddr. cannam@147: }; cannam@147: cannam@147: // ======================================================================================= cannam@147: // I/O Provider cannam@147: cannam@147: class AsyncIoProvider { cannam@147: // Class which constructs asynchronous wrappers around the operating system's I/O facilities. cannam@147: // cannam@147: // Generally, the implementation of this interface must integrate closely with a particular cannam@147: // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide cannam@147: // an AsyncIoProvider. cannam@147: cannam@147: public: cannam@147: virtual OneWayPipe newOneWayPipe() = 0; cannam@147: // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with cannam@147: // the pipe(2) system call). cannam@147: cannam@147: virtual TwoWayPipe newTwoWayPipe() = 0; cannam@147: // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with cannam@147: // socketpair(2) system call). Data written to one end can be read from the other. cannam@147: cannam@147: virtual Network& getNetwork() = 0; cannam@147: // Creates a new `Network` instance representing the networks exposed by the operating system. cannam@147: // cannam@147: // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If cannam@147: // you call this from low-level code, then you are preventing higher-level code from injecting an cannam@147: // alternative implementation. Instead, if your code needs to use network functionality, it cannam@147: // should ask for a `Network` as a constructor or method parameter, so that higher-level code can cannam@147: // chose what implementation to use. The system network is essentially a singleton. See: cannam@147: // http://www.object-oriented-security.org/lets-argue/singletons cannam@147: // cannam@147: // Code that uses the system network should not make any assumptions about what kinds of cannam@147: // addresses it will parse, as this could differ across platforms. String addresses should come cannam@147: // strictly from the user, who will know how to write them correctly for their system. cannam@147: // cannam@147: // With that said, KJ currently supports the following string address formats: cannam@147: // - IPv4: "1.2.3.4", "1.2.3.4:80" cannam@147: // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" cannam@147: // - Local IP wildcard (covers both v4 and v6): "*", "*:80" cannam@147: // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" cannam@147: // - Unix domain: "unix:/path/to/socket" cannam@147: cannam@147: struct PipeThread { cannam@147: // A combination of a thread and a two-way pipe that communicates with that thread. cannam@147: // cannam@147: // The fields are intentionally ordered so that the pipe will be destroyed (and therefore cannam@147: // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread cannam@147: // arranges to exit when it detects disconnect, destruction should be clean. cannam@147: cannam@147: Own thread; cannam@147: Own pipe; cannam@147: }; cannam@147: cannam@147: virtual PipeThread newPipeThread( cannam@147: Function startFunc) = 0; cannam@147: // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate cannam@147: // with it. One end of the pipe is passed to the thread's start function and the other end of cannam@147: // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will cannam@147: // already have an active `EventLoop` when `startFunc` is called. cannam@147: // cannam@147: // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too cannam@147: // much at once but I'm not sure how to cleanly break it down. cannam@147: cannam@147: virtual Timer& getTimer() = 0; cannam@147: // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- cannam@147: // it only updates when the event loop polls for system events. This means that calling `now()` cannam@147: // on this timer does not require a system call. cannam@147: // cannam@147: // This timer is not affected by changes to the system date. It is unspecified whether the timer cannam@147: // continues to count while the system is suspended. cannam@147: }; cannam@147: cannam@147: class LowLevelAsyncIoProvider { cannam@147: // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on cannam@147: // different operating systems. You should prefer to use `AsyncIoProvider` over this interface cannam@147: // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. cannam@147: // cannam@147: // On Unix, this interface can be used to import native file descriptors into the async framework. cannam@147: // Different implementations of this interface might work on top of different event handling cannam@147: // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. cannam@147: // cannam@147: // On Windows, this interface can be used to import native HANDLEs into the async framework. cannam@147: // Different implementations of this interface might work on top of different event handling cannam@147: // primitives, such as I/O completion ports vs. completion routines. cannam@147: // cannam@147: // TODO(port): Actually implement Windows support. cannam@147: cannam@147: public: cannam@147: // --------------------------------------------------------------------------- cannam@147: // Unix-specific stuff cannam@147: cannam@147: enum Flags { cannam@147: // Flags controlling how to wrap a file descriptor. cannam@147: cannam@147: TAKE_OWNERSHIP = 1 << 0, cannam@147: // The returned object should own the file descriptor, automatically closing it when destroyed. cannam@147: // The close-on-exec flag will be set on the descriptor if it is not already. cannam@147: // cannam@147: // If this flag is not used, then the file descriptor is not automatically closed and the cannam@147: // close-on-exec flag is not modified. cannam@147: cannam@147: #if !_WIN32 cannam@147: ALREADY_CLOEXEC = 1 << 1, cannam@147: // Indicates that the close-on-exec flag is known already to be set, so need not be set again. cannam@147: // Only relevant when combined with TAKE_OWNERSHIP. cannam@147: // cannam@147: // On Linux, all system calls which yield new file descriptors have flags or variants which cannam@147: // set the close-on-exec flag immediately. Unfortunately, other OS's do not. cannam@147: cannam@147: ALREADY_NONBLOCK = 1 << 2 cannam@147: // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag cannam@147: // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode cannam@147: // automatically. cannam@147: // cannam@147: // On Linux, all system calls which yield new file descriptors have flags or variants which cannam@147: // enable non-blocking mode immediately. Unfortunately, other OS's do not. cannam@147: #endif cannam@147: }; cannam@147: cannam@147: #if _WIN32 cannam@147: typedef uintptr_t Fd; cannam@147: // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the cannam@147: // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify cannam@147: // explicitly). cannam@147: #else cannam@147: typedef int Fd; cannam@147: // On Unix, any arbitrary file descriptor is supported. cannam@147: #endif cannam@147: cannam@147: virtual Own wrapInputFd(Fd fd, uint flags = 0) = 0; cannam@147: // Create an AsyncInputStream wrapping a file descriptor. cannam@147: // cannam@147: // `flags` is a bitwise-OR of the values of the `Flags` enum. cannam@147: cannam@147: virtual Own wrapOutputFd(Fd fd, uint flags = 0) = 0; cannam@147: // Create an AsyncOutputStream wrapping a file descriptor. cannam@147: // cannam@147: // `flags` is a bitwise-OR of the values of the `Flags` enum. cannam@147: cannam@147: virtual Own wrapSocketFd(Fd fd, uint flags = 0) = 0; cannam@147: // Create an AsyncIoStream wrapping a socket file descriptor. cannam@147: // cannam@147: // `flags` is a bitwise-OR of the values of the `Flags` enum. cannam@147: cannam@147: virtual Promise> wrapConnectingSocketFd( cannam@147: Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0; cannam@147: // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address. cannam@147: // The returned promise does not resolve until connection has completed. cannam@147: // cannam@147: // `flags` is a bitwise-OR of the values of the `Flags` enum. cannam@147: cannam@147: virtual Own wrapListenSocketFd(Fd fd, uint flags = 0) = 0; cannam@147: // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already cannam@147: // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. cannam@147: // cannam@147: // `flags` is a bitwise-OR of the values of the `Flags` enum. cannam@147: cannam@147: virtual Own wrapDatagramSocketFd(Fd fd, uint flags = 0); cannam@147: cannam@147: virtual Timer& getTimer() = 0; cannam@147: // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- cannam@147: // it only updates when the event loop polls for system events. This means that calling `now()` cannam@147: // on this timer does not require a system call. cannam@147: // cannam@147: // This timer is not affected by changes to the system date. It is unspecified whether the timer cannam@147: // continues to count while the system is suspended. cannam@147: }; cannam@147: cannam@147: Own newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); cannam@147: // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. cannam@147: cannam@147: struct AsyncIoContext { cannam@147: Own lowLevelProvider; cannam@147: Own provider; cannam@147: WaitScope& waitScope; cannam@147: cannam@147: #if _WIN32 cannam@147: Win32EventPort& win32EventPort; cannam@147: #else cannam@147: UnixEventPort& unixEventPort; cannam@147: // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This cannam@147: // field will go away at some point when we have a chance to improve these interfaces. cannam@147: #endif cannam@147: }; cannam@147: cannam@147: AsyncIoContext setupAsyncIo(); cannam@147: // Convenience method which sets up the current thread with everything it needs to do async I/O. cannam@147: // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for cannam@147: // doing I/O on the host system, so everything is ready for the thread to start making async calls cannam@147: // and waiting on promises. cannam@147: // cannam@147: // You would typically call this in your main() loop or in the start function of a thread. cannam@147: // Example: cannam@147: // cannam@147: // int main() { cannam@147: // auto ioContext = kj::setupAsyncIo(); cannam@147: // cannam@147: // // Now we can call an async function. cannam@147: // Promise textPromise = getHttp(*ioContext.provider, "http://example.com"); cannam@147: // cannam@147: // // And we can wait for the promise to complete. Note that you can only use `wait()` cannam@147: // // from the top level, not from inside a promise callback. cannam@147: // String text = textPromise.wait(ioContext.waitScope); cannam@147: // print(text); cannam@147: // return 0; cannam@147: // } cannam@147: // cannam@147: // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In cannam@147: // particular, note that after a fork(), an AsyncIoContext created in the parent process will cannam@147: // not work correctly in the child, even if the parent ceases to use its copy. In particular cannam@147: // note that this means that server processes which daemonize themselves at startup must wait cannam@147: // until after daemonization to create an AsyncIoContext. cannam@147: cannam@147: // ======================================================================================= cannam@147: // inline implementation details cannam@147: cannam@147: inline AncillaryMessage::AncillaryMessage( cannam@147: int level, int type, ArrayPtr data) cannam@147: : level(level), type(type), data(data) {} cannam@147: cannam@147: inline int AncillaryMessage::getLevel() const { return level; } cannam@147: inline int AncillaryMessage::getType() const { return type; } cannam@147: cannam@147: template cannam@147: inline Maybe AncillaryMessage::as() { cannam@147: if (data.size() >= sizeof(T)) { cannam@147: return *reinterpret_cast(data.begin()); cannam@147: } else { cannam@147: return nullptr; cannam@147: } cannam@147: } cannam@147: cannam@147: template cannam@147: inline ArrayPtr AncillaryMessage::asArray() { cannam@147: return arrayPtr(reinterpret_cast(data.begin()), data.size() / sizeof(T)); cannam@147: } cannam@147: cannam@147: } // namespace kj cannam@147: cannam@147: #endif // KJ_ASYNC_IO_H_