comparison win64-msvc/include/kj/async-io.h @ 47:d93140aac40b

Current Capnp libs and headers from git
author Chris Cannam
date Thu, 20 Oct 2016 18:15:38 +0100
parents
children 0f2d93caa50c
comparison
equal deleted inserted replaced
46:efe5b9f38b13 47:d93140aac40b
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #ifndef KJ_ASYNC_IO_H_
23 #define KJ_ASYNC_IO_H_
24
25 #if defined(__GNUC__) && !KJ_HEADER_WARNINGS
26 #pragma GCC system_header
27 #endif
28
29 #include "async.h"
30 #include "function.h"
31 #include "thread.h"
32 #include "time.h"
33
34 struct sockaddr;
35
36 namespace kj {
37
38 class UnixEventPort;
39 class NetworkAddress;
40
41 // =======================================================================================
42 // Streaming I/O
43
44 class AsyncInputStream {
45 // Asynchronous equivalent of InputStream (from io.h).
46
47 public:
48 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) = 0;
49 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
50
51 Promise<void> read(void* buffer, size_t bytes);
52 };
53
54 class AsyncOutputStream {
55 // Asynchronous equivalent of OutputStream (from io.h).
56
57 public:
58 virtual Promise<void> write(const void* buffer, size_t size) = 0;
59 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0;
60 };
61
62 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
63 // A combination input and output stream.
64
65 public:
66 virtual void shutdownWrite() = 0;
67 // Cleanly shut down just the write end of the stream, while keeping the read end open.
68
69 virtual void abortRead() {}
70 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only
71 // be called when an error has occurred.
72
73 virtual void getsockopt(int level, int option, void* value, uint* length);
74 virtual void setsockopt(int level, int option, const void* value, uint length);
75 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
76 // if the stream is not a socket or the option is not appropriate for the socket type. The
77 // default implementations always throw "unimplemented".
78
79 virtual void getsockname(struct sockaddr* addr, uint* length);
80 virtual void getpeername(struct sockaddr* addr, uint* length);
81 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented"
82 // exception if the stream is not a socket. The default implementations always throw
83 // "unimplemented".
84 //
85 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't
86 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are
87 // ephemeral addresses for a single connection.
88 };
89
90 struct OneWayPipe {
91 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.)
92
93 Own<AsyncInputStream> in;
94 Own<AsyncOutputStream> out;
95 };
96
97 struct TwoWayPipe {
98 // A data pipe that supports sending in both directions. Each end's output sends data to the
99 // other end's input. (Typically backed by socketpair() system call.)
100
101 Own<AsyncIoStream> ends[2];
102 };
103
104 class ConnectionReceiver {
105 // Represents a server socket listening on a port.
106
107 public:
108 virtual Promise<Own<AsyncIoStream>> accept() = 0;
109 // Accept the next incoming connection.
110
111 virtual uint getPort() = 0;
112 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
113 // specify a port when constructing the NetworkAddress -- one will have been assigned
114 // automatically.
115
116 virtual void getsockopt(int level, int option, void* value, uint* length);
117 virtual void setsockopt(int level, int option, const void* value, uint length);
118 // Same as the methods of AsyncIoStream.
119 };
120
121 // =======================================================================================
122 // Datagram I/O
123
124 class AncillaryMessage {
125 // Represents an ancillary message (aka control message) received using the recvmsg() system
126 // call (or equivalent). Most apps will not use this.
127
128 public:
129 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
130 AncillaryMessage() = default;
131
132 inline int getLevel() const;
133 // Originating protocol / socket level.
134
135 inline int getType() const;
136 // Protocol-specific message type.
137
138 template <typename T>
139 inline Maybe<const T&> as();
140 // Interpret the ancillary message as the given struct type. Most ancillary messages are some
141 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message
142 // is smaller than the struct -- this can happen if the message was truncated due to
143 // insufficient ancillary buffer space.
144
145 template <typename T>
146 inline ArrayPtr<const T> asArray();
147 // Interpret the ancillary message as an array of items. If the message size does not evenly
148 // divide into elements of type T, the remainder is discarded -- this can happen if the message
149 // was truncated due to insufficient ancillary buffer space.
150
151 private:
152 int level;
153 int type;
154 ArrayPtr<const byte> data;
155 // Message data. In most cases you should use `as()` or `asArray()`.
156 };
157
158 class DatagramReceiver {
159 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
160 // capacity in advance; if a received packet is larger than the capacity, it will be truncated.
161
162 public:
163 virtual Promise<void> receive() = 0;
164 // Receive a new message, overwriting this object's content.
165 //
166 // receive() may reuse the same buffers for content and ancillary data with each call.
167
168 template <typename T>
169 struct MaybeTruncated {
170 T value;
171
172 bool isTruncated;
173 // True if the Receiver's capacity was insufficient to receive the value and therefore the
174 // value is truncated.
175 };
176
177 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
178 // Get the content of the datagram.
179
180 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
181 // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr
182 // struct. Most apps don't need this.
183 //
184 // If the returned value is truncated, then the last message in the array may itself be
185 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
186 // return fewer elements than expected. Truncation can also mean that additional messages were
187 // available but discarded.
188
189 virtual NetworkAddress& getSource() = 0;
190 // Get the datagram sender's address.
191
192 struct Capacity {
193 size_t content = 8192;
194 // How much space to allocate for the datagram content. If a datagram is received that is
195 // larger than this, it will be truncated, with no way to recover the tail.
196
197 size_t ancillary = 0;
198 // How much space to allocate for ancillary messages. As with content, if the ancillary data
199 // is larger than this, it will be truncated.
200 };
201 };
202
203 class DatagramPort {
204 public:
205 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
206 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
207 NetworkAddress& destination) = 0;
208
209 virtual Own<DatagramReceiver> makeReceiver(
210 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
211 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
212 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.
213
214 virtual uint getPort() = 0;
215 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
216 // specify a port when constructing the NetworkAddress -- one will have been assigned
217 // automatically.
218
219 virtual void getsockopt(int level, int option, void* value, uint* length);
220 virtual void setsockopt(int level, int option, const void* value, uint length);
221 // Same as the methods of AsyncIoStream.
222 };
223
224 // =======================================================================================
225 // Networks
226
227 class NetworkAddress {
228 // Represents a remote address to which the application can connect.
229
230 public:
231 virtual Promise<Own<AsyncIoStream>> connect() = 0;
232 // Make a new connection to this address.
233 //
234 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number.
235
236 virtual Own<ConnectionReceiver> listen() = 0;
237 // Listen for incoming connections on this address.
238 //
239 // The address must be local.
240
241 virtual Own<DatagramPort> bindDatagramPort();
242 // Open this address as a datagram (e.g. UDP) port.
243 //
244 // The address must be local.
245
246 virtual Own<NetworkAddress> clone() = 0;
247 // Returns an equivalent copy of this NetworkAddress.
248
249 virtual String toString() = 0;
250 // Produce a human-readable string which hopefully can be passed to Network::parseAddress()
251 // to reproduce this address, although whether or not that works of course depends on the Network
252 // implementation. This should be called only to display the address to human users, who will
253 // hopefully know what they are able to do with it.
254 };
255
256 class Network {
257 // Factory for NetworkAddress instances, representing the network services offered by the
258 // operating system.
259 //
260 // This interface typically represents broad authority, and well-designed code should limit its
261 // use to high-level startup code and user interaction. Low-level APIs should accept
262 // NetworkAddress instances directly and work from there, if at all possible.
263
264 public:
265 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0;
266 // Construct a network address from a user-provided string. The format of the address
267 // strings is not specified at the API level, and application code should make no assumptions
268 // about them. These strings should always be provided by humans, and said humans will know
269 // what format to use in their particular context.
270 //
271 // `portHint`, if provided, specifies the "standard" IP port number for the application-level
272 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a
273 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is
274 // omitted, then the returned address will only support listen() and bindDatagramPort()
275 // (not connect()), and an unused port will be chosen each time one of those methods is called.
276
277 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
278 // Construct a network address from a legacy struct sockaddr.
279 };
280
281 // =======================================================================================
282 // I/O Provider
283
284 class AsyncIoProvider {
285 // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
286 //
287 // Generally, the implementation of this interface must integrate closely with a particular
288 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide
289 // an AsyncIoProvider.
290
291 public:
292 virtual OneWayPipe newOneWayPipe() = 0;
293 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with
294 // the pipe(2) system call).
295
296 virtual TwoWayPipe newTwoWayPipe() = 0;
297 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with
298 // socketpair(2) system call). Data written to one end can be read from the other.
299
300 virtual Network& getNetwork() = 0;
301 // Creates a new `Network` instance representing the networks exposed by the operating system.
302 //
303 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If
304 // you call this from low-level code, then you are preventing higher-level code from injecting an
305 // alternative implementation. Instead, if your code needs to use network functionality, it
306 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can
307 // chose what implementation to use. The system network is essentially a singleton. See:
308 // http://www.object-oriented-security.org/lets-argue/singletons
309 //
310 // Code that uses the system network should not make any assumptions about what kinds of
311 // addresses it will parse, as this could differ across platforms. String addresses should come
312 // strictly from the user, who will know how to write them correctly for their system.
313 //
314 // With that said, KJ currently supports the following string address formats:
315 // - IPv4: "1.2.3.4", "1.2.3.4:80"
316 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
317 // - Local IP wildcard (covers both v4 and v6): "*", "*:80"
318 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http"
319 // - Unix domain: "unix:/path/to/socket"
320
321 struct PipeThread {
322 // A combination of a thread and a two-way pipe that communicates with that thread.
323 //
324 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore
325 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread
326 // arranges to exit when it detects disconnect, destruction should be clean.
327
328 Own<Thread> thread;
329 Own<AsyncIoStream> pipe;
330 };
331
332 virtual PipeThread newPipeThread(
333 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0;
334 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate
335 // with it. One end of the pipe is passed to the thread's start function and the other end of
336 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will
337 // already have an active `EventLoop` when `startFunc` is called.
338 //
339 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too
340 // much at once but I'm not sure how to cleanly break it down.
341
342 virtual Timer& getTimer() = 0;
343 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
344 // it only updates when the event loop polls for system events. This means that calling `now()`
345 // on this timer does not require a system call.
346 //
347 // This timer is not affected by changes to the system date. It is unspecified whether the timer
348 // continues to count while the system is suspended.
349 };
350
351 class LowLevelAsyncIoProvider {
352 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on
353 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface
354 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection.
355 //
356 // On Unix, this interface can be used to import native file descriptors into the async framework.
357 // Different implementations of this interface might work on top of different event handling
358 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library.
359 //
360 // On Windows, this interface can be used to import native HANDLEs into the async framework.
361 // Different implementations of this interface might work on top of different event handling
362 // primitives, such as I/O completion ports vs. completion routines.
363 //
364 // TODO(port): Actually implement Windows support.
365
366 public:
367 // ---------------------------------------------------------------------------
368 // Unix-specific stuff
369
370 enum Flags {
371 // Flags controlling how to wrap a file descriptor.
372
373 TAKE_OWNERSHIP = 1 << 0,
374 // The returned object should own the file descriptor, automatically closing it when destroyed.
375 // The close-on-exec flag will be set on the descriptor if it is not already.
376 //
377 // If this flag is not used, then the file descriptor is not automatically closed and the
378 // close-on-exec flag is not modified.
379
380 ALREADY_CLOEXEC = 1 << 1,
381 // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
382 // Only relevant when combined with TAKE_OWNERSHIP.
383 //
384 // On Linux, all system calls which yield new file descriptors have flags or variants which
385 // set the close-on-exec flag immediately. Unfortunately, other OS's do not.
386
387 ALREADY_NONBLOCK = 1 << 2
388 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag
389 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode
390 // automatically.
391 //
392 // On Linux, all system calls which yield new file descriptors have flags or variants which
393 // enable non-blocking mode immediately. Unfortunately, other OS's do not.
394 };
395
396 virtual Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) = 0;
397 // Create an AsyncInputStream wrapping a file descriptor.
398 //
399 // `flags` is a bitwise-OR of the values of the `Flags` enum.
400
401 virtual Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) = 0;
402 // Create an AsyncOutputStream wrapping a file descriptor.
403 //
404 // `flags` is a bitwise-OR of the values of the `Flags` enum.
405
406 virtual Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) = 0;
407 // Create an AsyncIoStream wrapping a socket file descriptor.
408 //
409 // `flags` is a bitwise-OR of the values of the `Flags` enum.
410
411 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) = 0;
412 // Create an AsyncIoStream wrapping a socket that is in the process of connecting. The returned
413 // promise should not resolve until connection has completed -- traditionally indicated by the
414 // descriptor becoming writable.
415 //
416 // `flags` is a bitwise-OR of the values of the `Flags` enum.
417
418 virtual Own<ConnectionReceiver> wrapListenSocketFd(int fd, uint flags = 0) = 0;
419 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
420 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
421 //
422 // `flags` is a bitwise-OR of the values of the `Flags` enum.
423
424 virtual Own<DatagramPort> wrapDatagramSocketFd(int fd, uint flags = 0);
425
426 virtual Timer& getTimer() = 0;
427 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
428 // it only updates when the event loop polls for system events. This means that calling `now()`
429 // on this timer does not require a system call.
430 //
431 // This timer is not affected by changes to the system date. It is unspecified whether the timer
432 // continues to count while the system is suspended.
433 };
434
435 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
436 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`.
437
438 struct AsyncIoContext {
439 Own<LowLevelAsyncIoProvider> lowLevelProvider;
440 Own<AsyncIoProvider> provider;
441 WaitScope& waitScope;
442
443 UnixEventPort& unixEventPort;
444 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
445 // field will go away at some point when we have a chance to improve these interfaces.
446 };
447
448 AsyncIoContext setupAsyncIo();
449 // Convenience method which sets up the current thread with everything it needs to do async I/O.
450 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for
451 // doing I/O on the host system, so everything is ready for the thread to start making async calls
452 // and waiting on promises.
453 //
454 // You would typically call this in your main() loop or in the start function of a thread.
455 // Example:
456 //
457 // int main() {
458 // auto ioContext = kj::setupAsyncIo();
459 //
460 // // Now we can call an async function.
461 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com");
462 //
463 // // And we can wait for the promise to complete. Note that you can only use `wait()`
464 // // from the top level, not from inside a promise callback.
465 // String text = textPromise.wait(ioContext.waitScope);
466 // print(text);
467 // return 0;
468 // }
469 //
470 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In
471 // particular, note that after a fork(), an AsyncIoContext created in the parent process will
472 // not work correctly in the child, even if the parent ceases to use its copy. In particular
473 // note that this means that server processes which daemonize themselves at startup must wait
474 // until after daemonization to create an AsyncIoContext.
475
476 // =======================================================================================
477 // inline implementation details
478
479 inline AncillaryMessage::AncillaryMessage(
480 int level, int type, ArrayPtr<const byte> data)
481 : level(level), type(type), data(data) {}
482
483 inline int AncillaryMessage::getLevel() const { return level; }
484 inline int AncillaryMessage::getType() const { return type; }
485
486 template <typename T>
487 inline Maybe<const T&> AncillaryMessage::as() {
488 if (data.size() >= sizeof(T)) {
489 return *reinterpret_cast<const T*>(data.begin());
490 } else {
491 return nullptr;
492 }
493 }
494
495 template <typename T>
496 inline ArrayPtr<const T> AncillaryMessage::asArray() {
497 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
498 }
499
500 } // namespace kj
501
502 #endif // KJ_ASYNC_IO_H_