annotate win64-msvc/include/kj/async-io.h @ 169:223a55898ab9 tip default

Add null config files
author Chris Cannam <cannam@all-day-breakfast.com>
date Mon, 02 Mar 2020 14:03:47 +0000
parents b4bfdf10c4b3
children
rev   line source
cannam@148 1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
cannam@148 2 // Licensed under the MIT License:
cannam@148 3 //
cannam@148 4 // Permission is hereby granted, free of charge, to any person obtaining a copy
cannam@148 5 // of this software and associated documentation files (the "Software"), to deal
cannam@148 6 // in the Software without restriction, including without limitation the rights
cannam@148 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
cannam@148 8 // copies of the Software, and to permit persons to whom the Software is
cannam@148 9 // furnished to do so, subject to the following conditions:
cannam@148 10 //
cannam@148 11 // The above copyright notice and this permission notice shall be included in
cannam@148 12 // all copies or substantial portions of the Software.
cannam@148 13 //
cannam@148 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
cannam@148 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
cannam@148 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
cannam@148 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
cannam@148 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
cannam@148 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
cannam@148 20 // THE SOFTWARE.
cannam@148 21
cannam@148 22 #ifndef KJ_ASYNC_IO_H_
cannam@148 23 #define KJ_ASYNC_IO_H_
cannam@148 24
cannam@148 25 #if defined(__GNUC__) && !KJ_HEADER_WARNINGS
cannam@148 26 #pragma GCC system_header
cannam@148 27 #endif
cannam@148 28
cannam@148 29 #include "async.h"
cannam@148 30 #include "function.h"
cannam@148 31 #include "thread.h"
cannam@148 32 #include "time.h"
cannam@148 33
cannam@148 34 struct sockaddr;
cannam@148 35
cannam@148 36 namespace kj {
cannam@148 37
cannam@148 38 #if _WIN32
cannam@148 39 class Win32EventPort;
cannam@148 40 #else
cannam@148 41 class UnixEventPort;
cannam@148 42 #endif
cannam@148 43
cannam@148 44 class NetworkAddress;
cannam@148 45 class AsyncOutputStream;
cannam@148 46
cannam@148 47 // =======================================================================================
cannam@148 48 // Streaming I/O
cannam@148 49
cannam@148 50 class AsyncInputStream {
cannam@148 51 // Asynchronous equivalent of InputStream (from io.h).
cannam@148 52
cannam@148 53 public:
cannam@148 54 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes);
cannam@148 55 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
cannam@148 56
cannam@148 57 Promise<void> read(void* buffer, size_t bytes);
cannam@148 58
cannam@148 59 virtual Maybe<uint64_t> tryGetLength();
cannam@148 60 // Get the remaining number of bytes that will be produced by this stream, if known.
cannam@148 61 //
cannam@148 62 // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
cannam@148 63 // HTTP implementation may need to fall back to Transfer-Encoding: chunked.
cannam@148 64 //
cannam@148 65 // The default implementation always returns null.
cannam@148 66
cannam@148 67 virtual Promise<uint64_t> pumpTo(
cannam@148 68 AsyncOutputStream& output, uint64_t amount = kj::maxValue);
cannam@148 69 // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
cannam@148 70 // total bytes actually pumped (which is only less than `amount` if EOF was reached).
cannam@148 71 //
cannam@148 72 // Override this if your stream type knows how to pump itself to certain kinds of output
cannam@148 73 // streams more efficiently than via the naive approach. You can use
cannam@148 74 // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
cannam@148 75 // delegate to the default implementation.
cannam@148 76 //
cannam@148 77 // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
cannam@148 78 // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.
cannam@148 79
cannam@148 80 Promise<Array<byte>> readAllBytes();
cannam@148 81 Promise<String> readAllText();
cannam@148 82 // Read until EOF and return as one big byte array or string.
cannam@148 83 };
cannam@148 84
cannam@148 85 class AsyncOutputStream {
cannam@148 86 // Asynchronous equivalent of OutputStream (from io.h).
cannam@148 87
cannam@148 88 public:
cannam@148 89 virtual Promise<void> write(const void* buffer, size_t size) = 0;
cannam@148 90 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0;
cannam@148 91
cannam@148 92 virtual Maybe<Promise<uint64_t>> tryPumpFrom(
cannam@148 93 AsyncInputStream& input, uint64_t amount = kj::maxValue);
cannam@148 94 // Implements double-dispatch for AsyncInputStream::pumpTo().
cannam@148 95 //
cannam@148 96 // This method should only be called from within an implementation of pumpTo().
cannam@148 97 //
cannam@148 98 // This method examines the type of `input` to find optimized ways to pump data from it to this
cannam@148 99 // output stream. If it finds one, it performs the pump. Otherwise, it returns null.
cannam@148 100 //
cannam@148 101 // The default implementation always returns null.
cannam@148 102 };
cannam@148 103
cannam@148 104 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
cannam@148 105 // A combination input and output stream.
cannam@148 106
cannam@148 107 public:
cannam@148 108 virtual void shutdownWrite() = 0;
cannam@148 109 // Cleanly shut down just the write end of the stream, while keeping the read end open.
cannam@148 110
cannam@148 111 virtual void abortRead() {}
cannam@148 112 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only
cannam@148 113 // be called when an error has occurred.
cannam@148 114
cannam@148 115 virtual void getsockopt(int level, int option, void* value, uint* length);
cannam@148 116 virtual void setsockopt(int level, int option, const void* value, uint length);
cannam@148 117 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
cannam@148 118 // if the stream is not a socket or the option is not appropriate for the socket type. The
cannam@148 119 // default implementations always throw "unimplemented".
cannam@148 120
cannam@148 121 virtual void getsockname(struct sockaddr* addr, uint* length);
cannam@148 122 virtual void getpeername(struct sockaddr* addr, uint* length);
cannam@148 123 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented"
cannam@148 124 // exception if the stream is not a socket. The default implementations always throw
cannam@148 125 // "unimplemented".
cannam@148 126 //
cannam@148 127 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't
cannam@148 128 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are
cannam@148 129 // ephemeral addresses for a single connection.
cannam@148 130 };
cannam@148 131
cannam@148 132 struct OneWayPipe {
cannam@148 133 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.)
cannam@148 134
cannam@148 135 Own<AsyncInputStream> in;
cannam@148 136 Own<AsyncOutputStream> out;
cannam@148 137 };
cannam@148 138
cannam@148 139 struct TwoWayPipe {
cannam@148 140 // A data pipe that supports sending in both directions. Each end's output sends data to the
cannam@148 141 // other end's input. (Typically backed by socketpair() system call.)
cannam@148 142
cannam@148 143 Own<AsyncIoStream> ends[2];
cannam@148 144 };
cannam@148 145
cannam@148 146 class ConnectionReceiver {
cannam@148 147 // Represents a server socket listening on a port.
cannam@148 148
cannam@148 149 public:
cannam@148 150 virtual Promise<Own<AsyncIoStream>> accept() = 0;
cannam@148 151 // Accept the next incoming connection.
cannam@148 152
cannam@148 153 virtual uint getPort() = 0;
cannam@148 154 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
cannam@148 155 // specify a port when constructing the NetworkAddress -- one will have been assigned
cannam@148 156 // automatically.
cannam@148 157
cannam@148 158 virtual void getsockopt(int level, int option, void* value, uint* length);
cannam@148 159 virtual void setsockopt(int level, int option, const void* value, uint length);
cannam@148 160 // Same as the methods of AsyncIoStream.
cannam@148 161 };
cannam@148 162
cannam@148 163 // =======================================================================================
cannam@148 164 // Datagram I/O
cannam@148 165
cannam@148 166 class AncillaryMessage {
cannam@148 167 // Represents an ancillary message (aka control message) received using the recvmsg() system
cannam@148 168 // call (or equivalent). Most apps will not use this.
cannam@148 169
cannam@148 170 public:
cannam@148 171 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
cannam@148 172 AncillaryMessage() = default;
cannam@148 173
cannam@148 174 inline int getLevel() const;
cannam@148 175 // Originating protocol / socket level.
cannam@148 176
cannam@148 177 inline int getType() const;
cannam@148 178 // Protocol-specific message type.
cannam@148 179
cannam@148 180 template <typename T>
cannam@148 181 inline Maybe<const T&> as();
cannam@148 182 // Interpret the ancillary message as the given struct type. Most ancillary messages are some
cannam@148 183 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message
cannam@148 184 // is smaller than the struct -- this can happen if the message was truncated due to
cannam@148 185 // insufficient ancillary buffer space.
cannam@148 186
cannam@148 187 template <typename T>
cannam@148 188 inline ArrayPtr<const T> asArray();
cannam@148 189 // Interpret the ancillary message as an array of items. If the message size does not evenly
cannam@148 190 // divide into elements of type T, the remainder is discarded -- this can happen if the message
cannam@148 191 // was truncated due to insufficient ancillary buffer space.
cannam@148 192
cannam@148 193 private:
cannam@148 194 int level;
cannam@148 195 int type;
cannam@148 196 ArrayPtr<const byte> data;
cannam@148 197 // Message data. In most cases you should use `as()` or `asArray()`.
cannam@148 198 };
cannam@148 199
cannam@148 200 class DatagramReceiver {
cannam@148 201 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
cannam@148 202 // capacity in advance; if a received packet is larger than the capacity, it will be truncated.
cannam@148 203
cannam@148 204 public:
cannam@148 205 virtual Promise<void> receive() = 0;
cannam@148 206 // Receive a new message, overwriting this object's content.
cannam@148 207 //
cannam@148 208 // receive() may reuse the same buffers for content and ancillary data with each call.
cannam@148 209
cannam@148 210 template <typename T>
cannam@148 211 struct MaybeTruncated {
cannam@148 212 T value;
cannam@148 213
cannam@148 214 bool isTruncated;
cannam@148 215 // True if the Receiver's capacity was insufficient to receive the value and therefore the
cannam@148 216 // value is truncated.
cannam@148 217 };
cannam@148 218
cannam@148 219 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
cannam@148 220 // Get the content of the datagram.
cannam@148 221
cannam@148 222 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
cannam@148 223 // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr
cannam@148 224 // struct. Most apps don't need this.
cannam@148 225 //
cannam@148 226 // If the returned value is truncated, then the last message in the array may itself be
cannam@148 227 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
cannam@148 228 // return fewer elements than expected. Truncation can also mean that additional messages were
cannam@148 229 // available but discarded.
cannam@148 230
cannam@148 231 virtual NetworkAddress& getSource() = 0;
cannam@148 232 // Get the datagram sender's address.
cannam@148 233
cannam@148 234 struct Capacity {
cannam@148 235 size_t content = 8192;
cannam@148 236 // How much space to allocate for the datagram content. If a datagram is received that is
cannam@148 237 // larger than this, it will be truncated, with no way to recover the tail.
cannam@148 238
cannam@148 239 size_t ancillary = 0;
cannam@148 240 // How much space to allocate for ancillary messages. As with content, if the ancillary data
cannam@148 241 // is larger than this, it will be truncated.
cannam@148 242 };
cannam@148 243 };
cannam@148 244
cannam@148 245 class DatagramPort {
cannam@148 246 public:
cannam@148 247 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
cannam@148 248 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
cannam@148 249 NetworkAddress& destination) = 0;
cannam@148 250
cannam@148 251 virtual Own<DatagramReceiver> makeReceiver(
cannam@148 252 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
cannam@148 253 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
cannam@148 254 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.
cannam@148 255
cannam@148 256 virtual uint getPort() = 0;
cannam@148 257 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
cannam@148 258 // specify a port when constructing the NetworkAddress -- one will have been assigned
cannam@148 259 // automatically.
cannam@148 260
cannam@148 261 virtual void getsockopt(int level, int option, void* value, uint* length);
cannam@148 262 virtual void setsockopt(int level, int option, const void* value, uint length);
cannam@148 263 // Same as the methods of AsyncIoStream.
cannam@148 264 };
cannam@148 265
cannam@148 266 // =======================================================================================
cannam@148 267 // Networks
cannam@148 268
cannam@148 269 class NetworkAddress {
cannam@148 270 // Represents a remote address to which the application can connect.
cannam@148 271
cannam@148 272 public:
cannam@148 273 virtual Promise<Own<AsyncIoStream>> connect() = 0;
cannam@148 274 // Make a new connection to this address.
cannam@148 275 //
cannam@148 276 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number.
cannam@148 277
cannam@148 278 virtual Own<ConnectionReceiver> listen() = 0;
cannam@148 279 // Listen for incoming connections on this address.
cannam@148 280 //
cannam@148 281 // The address must be local.
cannam@148 282
cannam@148 283 virtual Own<DatagramPort> bindDatagramPort();
cannam@148 284 // Open this address as a datagram (e.g. UDP) port.
cannam@148 285 //
cannam@148 286 // The address must be local.
cannam@148 287
cannam@148 288 virtual Own<NetworkAddress> clone() = 0;
cannam@148 289 // Returns an equivalent copy of this NetworkAddress.
cannam@148 290
cannam@148 291 virtual String toString() = 0;
cannam@148 292 // Produce a human-readable string which hopefully can be passed to Network::parseAddress()
cannam@148 293 // to reproduce this address, although whether or not that works of course depends on the Network
cannam@148 294 // implementation. This should be called only to display the address to human users, who will
cannam@148 295 // hopefully know what they are able to do with it.
cannam@148 296 };
cannam@148 297
cannam@148 298 class Network {
cannam@148 299 // Factory for NetworkAddress instances, representing the network services offered by the
cannam@148 300 // operating system.
cannam@148 301 //
cannam@148 302 // This interface typically represents broad authority, and well-designed code should limit its
cannam@148 303 // use to high-level startup code and user interaction. Low-level APIs should accept
cannam@148 304 // NetworkAddress instances directly and work from there, if at all possible.
cannam@148 305
cannam@148 306 public:
cannam@148 307 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0;
cannam@148 308 // Construct a network address from a user-provided string. The format of the address
cannam@148 309 // strings is not specified at the API level, and application code should make no assumptions
cannam@148 310 // about them. These strings should always be provided by humans, and said humans will know
cannam@148 311 // what format to use in their particular context.
cannam@148 312 //
cannam@148 313 // `portHint`, if provided, specifies the "standard" IP port number for the application-level
cannam@148 314 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a
cannam@148 315 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is
cannam@148 316 // omitted, then the returned address will only support listen() and bindDatagramPort()
cannam@148 317 // (not connect()), and an unused port will be chosen each time one of those methods is called.
cannam@148 318
cannam@148 319 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
cannam@148 320 // Construct a network address from a legacy struct sockaddr.
cannam@148 321 };
cannam@148 322
cannam@148 323 // =======================================================================================
cannam@148 324 // I/O Provider
cannam@148 325
cannam@148 326 class AsyncIoProvider {
cannam@148 327 // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
cannam@148 328 //
cannam@148 329 // Generally, the implementation of this interface must integrate closely with a particular
cannam@148 330 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide
cannam@148 331 // an AsyncIoProvider.
cannam@148 332
cannam@148 333 public:
cannam@148 334 virtual OneWayPipe newOneWayPipe() = 0;
cannam@148 335 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with
cannam@148 336 // the pipe(2) system call).
cannam@148 337
cannam@148 338 virtual TwoWayPipe newTwoWayPipe() = 0;
cannam@148 339 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with
cannam@148 340 // socketpair(2) system call). Data written to one end can be read from the other.
cannam@148 341
cannam@148 342 virtual Network& getNetwork() = 0;
cannam@148 343 // Creates a new `Network` instance representing the networks exposed by the operating system.
cannam@148 344 //
cannam@148 345 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If
cannam@148 346 // you call this from low-level code, then you are preventing higher-level code from injecting an
cannam@148 347 // alternative implementation. Instead, if your code needs to use network functionality, it
cannam@148 348 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can
cannam@148 349 // chose what implementation to use. The system network is essentially a singleton. See:
cannam@148 350 // http://www.object-oriented-security.org/lets-argue/singletons
cannam@148 351 //
cannam@148 352 // Code that uses the system network should not make any assumptions about what kinds of
cannam@148 353 // addresses it will parse, as this could differ across platforms. String addresses should come
cannam@148 354 // strictly from the user, who will know how to write them correctly for their system.
cannam@148 355 //
cannam@148 356 // With that said, KJ currently supports the following string address formats:
cannam@148 357 // - IPv4: "1.2.3.4", "1.2.3.4:80"
cannam@148 358 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
cannam@148 359 // - Local IP wildcard (covers both v4 and v6): "*", "*:80"
cannam@148 360 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http"
cannam@148 361 // - Unix domain: "unix:/path/to/socket"
cannam@148 362
cannam@148 363 struct PipeThread {
cannam@148 364 // A combination of a thread and a two-way pipe that communicates with that thread.
cannam@148 365 //
cannam@148 366 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore
cannam@148 367 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread
cannam@148 368 // arranges to exit when it detects disconnect, destruction should be clean.
cannam@148 369
cannam@148 370 Own<Thread> thread;
cannam@148 371 Own<AsyncIoStream> pipe;
cannam@148 372 };
cannam@148 373
cannam@148 374 virtual PipeThread newPipeThread(
cannam@148 375 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0;
cannam@148 376 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate
cannam@148 377 // with it. One end of the pipe is passed to the thread's start function and the other end of
cannam@148 378 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will
cannam@148 379 // already have an active `EventLoop` when `startFunc` is called.
cannam@148 380 //
cannam@148 381 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too
cannam@148 382 // much at once but I'm not sure how to cleanly break it down.
cannam@148 383
cannam@148 384 virtual Timer& getTimer() = 0;
cannam@148 385 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
cannam@148 386 // it only updates when the event loop polls for system events. This means that calling `now()`
cannam@148 387 // on this timer does not require a system call.
cannam@148 388 //
cannam@148 389 // This timer is not affected by changes to the system date. It is unspecified whether the timer
cannam@148 390 // continues to count while the system is suspended.
cannam@148 391 };
cannam@148 392
cannam@148 393 class LowLevelAsyncIoProvider {
cannam@148 394 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on
cannam@148 395 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface
cannam@148 396 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection.
cannam@148 397 //
cannam@148 398 // On Unix, this interface can be used to import native file descriptors into the async framework.
cannam@148 399 // Different implementations of this interface might work on top of different event handling
cannam@148 400 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library.
cannam@148 401 //
cannam@148 402 // On Windows, this interface can be used to import native HANDLEs into the async framework.
cannam@148 403 // Different implementations of this interface might work on top of different event handling
cannam@148 404 // primitives, such as I/O completion ports vs. completion routines.
cannam@148 405 //
cannam@148 406 // TODO(port): Actually implement Windows support.
cannam@148 407
cannam@148 408 public:
cannam@148 409 // ---------------------------------------------------------------------------
cannam@148 410 // Unix-specific stuff
cannam@148 411
cannam@148 412 enum Flags {
cannam@148 413 // Flags controlling how to wrap a file descriptor.
cannam@148 414
cannam@148 415 TAKE_OWNERSHIP = 1 << 0,
cannam@148 416 // The returned object should own the file descriptor, automatically closing it when destroyed.
cannam@148 417 // The close-on-exec flag will be set on the descriptor if it is not already.
cannam@148 418 //
cannam@148 419 // If this flag is not used, then the file descriptor is not automatically closed and the
cannam@148 420 // close-on-exec flag is not modified.
cannam@148 421
cannam@148 422 #if !_WIN32
cannam@148 423 ALREADY_CLOEXEC = 1 << 1,
cannam@148 424 // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
cannam@148 425 // Only relevant when combined with TAKE_OWNERSHIP.
cannam@148 426 //
cannam@148 427 // On Linux, all system calls which yield new file descriptors have flags or variants which
cannam@148 428 // set the close-on-exec flag immediately. Unfortunately, other OS's do not.
cannam@148 429
cannam@148 430 ALREADY_NONBLOCK = 1 << 2
cannam@148 431 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag
cannam@148 432 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode
cannam@148 433 // automatically.
cannam@148 434 //
cannam@148 435 // On Linux, all system calls which yield new file descriptors have flags or variants which
cannam@148 436 // enable non-blocking mode immediately. Unfortunately, other OS's do not.
cannam@148 437 #endif
cannam@148 438 };
cannam@148 439
cannam@148 440 #if _WIN32
cannam@148 441 typedef uintptr_t Fd;
cannam@148 442 // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the
cannam@148 443 // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify
cannam@148 444 // explicitly).
cannam@148 445 #else
cannam@148 446 typedef int Fd;
cannam@148 447 // On Unix, any arbitrary file descriptor is supported.
cannam@148 448 #endif
cannam@148 449
cannam@148 450 virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0;
cannam@148 451 // Create an AsyncInputStream wrapping a file descriptor.
cannam@148 452 //
cannam@148 453 // `flags` is a bitwise-OR of the values of the `Flags` enum.
cannam@148 454
cannam@148 455 virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0;
cannam@148 456 // Create an AsyncOutputStream wrapping a file descriptor.
cannam@148 457 //
cannam@148 458 // `flags` is a bitwise-OR of the values of the `Flags` enum.
cannam@148 459
cannam@148 460 virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0;
cannam@148 461 // Create an AsyncIoStream wrapping a socket file descriptor.
cannam@148 462 //
cannam@148 463 // `flags` is a bitwise-OR of the values of the `Flags` enum.
cannam@148 464
cannam@148 465 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
cannam@148 466 Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0;
cannam@148 467 // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address.
cannam@148 468 // The returned promise does not resolve until connection has completed.
cannam@148 469 //
cannam@148 470 // `flags` is a bitwise-OR of the values of the `Flags` enum.
cannam@148 471
cannam@148 472 virtual Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) = 0;
cannam@148 473 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
cannam@148 474 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
cannam@148 475 //
cannam@148 476 // `flags` is a bitwise-OR of the values of the `Flags` enum.
cannam@148 477
cannam@148 478 virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0);
cannam@148 479
cannam@148 480 virtual Timer& getTimer() = 0;
cannam@148 481 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
cannam@148 482 // it only updates when the event loop polls for system events. This means that calling `now()`
cannam@148 483 // on this timer does not require a system call.
cannam@148 484 //
cannam@148 485 // This timer is not affected by changes to the system date. It is unspecified whether the timer
cannam@148 486 // continues to count while the system is suspended.
cannam@148 487 };
cannam@148 488
cannam@148 489 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
cannam@148 490 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`.
cannam@148 491
cannam@148 492 struct AsyncIoContext {
cannam@148 493 Own<LowLevelAsyncIoProvider> lowLevelProvider;
cannam@148 494 Own<AsyncIoProvider> provider;
cannam@148 495 WaitScope& waitScope;
cannam@148 496
cannam@148 497 #if _WIN32
cannam@148 498 Win32EventPort& win32EventPort;
cannam@148 499 #else
cannam@148 500 UnixEventPort& unixEventPort;
cannam@148 501 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
cannam@148 502 // field will go away at some point when we have a chance to improve these interfaces.
cannam@148 503 #endif
cannam@148 504 };
cannam@148 505
cannam@148 506 AsyncIoContext setupAsyncIo();
cannam@148 507 // Convenience method which sets up the current thread with everything it needs to do async I/O.
cannam@148 508 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for
cannam@148 509 // doing I/O on the host system, so everything is ready for the thread to start making async calls
cannam@148 510 // and waiting on promises.
cannam@148 511 //
cannam@148 512 // You would typically call this in your main() loop or in the start function of a thread.
cannam@148 513 // Example:
cannam@148 514 //
cannam@148 515 // int main() {
cannam@148 516 // auto ioContext = kj::setupAsyncIo();
cannam@148 517 //
cannam@148 518 // // Now we can call an async function.
cannam@148 519 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com");
cannam@148 520 //
cannam@148 521 // // And we can wait for the promise to complete. Note that you can only use `wait()`
cannam@148 522 // // from the top level, not from inside a promise callback.
cannam@148 523 // String text = textPromise.wait(ioContext.waitScope);
cannam@148 524 // print(text);
cannam@148 525 // return 0;
cannam@148 526 // }
cannam@148 527 //
cannam@148 528 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In
cannam@148 529 // particular, note that after a fork(), an AsyncIoContext created in the parent process will
cannam@148 530 // not work correctly in the child, even if the parent ceases to use its copy. In particular
cannam@148 531 // note that this means that server processes which daemonize themselves at startup must wait
cannam@148 532 // until after daemonization to create an AsyncIoContext.
cannam@148 533
cannam@148 534 // =======================================================================================
cannam@148 535 // inline implementation details
cannam@148 536
cannam@148 537 inline AncillaryMessage::AncillaryMessage(
cannam@148 538 int level, int type, ArrayPtr<const byte> data)
cannam@148 539 : level(level), type(type), data(data) {}
cannam@148 540
cannam@148 541 inline int AncillaryMessage::getLevel() const { return level; }
cannam@148 542 inline int AncillaryMessage::getType() const { return type; }
cannam@148 543
cannam@148 544 template <typename T>
cannam@148 545 inline Maybe<const T&> AncillaryMessage::as() {
cannam@148 546 if (data.size() >= sizeof(T)) {
cannam@148 547 return *reinterpret_cast<const T*>(data.begin());
cannam@148 548 } else {
cannam@148 549 return nullptr;
cannam@148 550 }
cannam@148 551 }
cannam@148 552
cannam@148 553 template <typename T>
cannam@148 554 inline ArrayPtr<const T> AncillaryMessage::asArray() {
cannam@148 555 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
cannam@148 556 }
cannam@148 557
cannam@148 558 } // namespace kj
cannam@148 559
cannam@148 560 #endif // KJ_ASYNC_IO_H_