Mercurial > hg > sv-dependency-builds
comparison win64-msvc/include/kj/async-io.h @ 47:d93140aac40b
Current Capnp libs and headers from git
author | Chris Cannam |
---|---|
date | Thu, 20 Oct 2016 18:15:38 +0100 |
parents | |
children | 0f2d93caa50c |
comparison
equal
deleted
inserted
replaced
46:efe5b9f38b13 | 47:d93140aac40b |
---|---|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
2 // Licensed under the MIT License: | |
3 // | |
4 // Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 // of this software and associated documentation files (the "Software"), to deal | |
6 // in the Software without restriction, including without limitation the rights | |
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 // copies of the Software, and to permit persons to whom the Software is | |
9 // furnished to do so, subject to the following conditions: | |
10 // | |
11 // The above copyright notice and this permission notice shall be included in | |
12 // all copies or substantial portions of the Software. | |
13 // | |
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
20 // THE SOFTWARE. | |
21 | |
22 #ifndef KJ_ASYNC_IO_H_ | |
23 #define KJ_ASYNC_IO_H_ | |
24 | |
25 #if defined(__GNUC__) && !KJ_HEADER_WARNINGS | |
26 #pragma GCC system_header | |
27 #endif | |
28 | |
29 #include "async.h" | |
30 #include "function.h" | |
31 #include "thread.h" | |
32 #include "time.h" | |
33 | |
34 struct sockaddr; | |
35 | |
36 namespace kj { | |
37 | |
38 class UnixEventPort; | |
39 class NetworkAddress; | |
40 | |
41 // ======================================================================================= | |
42 // Streaming I/O | |
43 | |
44 class AsyncInputStream { | |
45 // Asynchronous equivalent of InputStream (from io.h). | |
46 | |
47 public: | |
48 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) = 0; | |
49 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; | |
50 | |
51 Promise<void> read(void* buffer, size_t bytes); | |
52 }; | |
53 | |
54 class AsyncOutputStream { | |
55 // Asynchronous equivalent of OutputStream (from io.h). | |
56 | |
57 public: | |
58 virtual Promise<void> write(const void* buffer, size_t size) = 0; | |
59 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0; | |
60 }; | |
61 | |
62 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { | |
63 // A combination input and output stream. | |
64 | |
65 public: | |
66 virtual void shutdownWrite() = 0; | |
67 // Cleanly shut down just the write end of the stream, while keeping the read end open. | |
68 | |
69 virtual void abortRead() {} | |
70 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only | |
71 // be called when an error has occurred. | |
72 | |
73 virtual void getsockopt(int level, int option, void* value, uint* length); | |
74 virtual void setsockopt(int level, int option, const void* value, uint length); | |
75 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception | |
76 // if the stream is not a socket or the option is not appropriate for the socket type. The | |
77 // default implementations always throw "unimplemented". | |
78 | |
79 virtual void getsockname(struct sockaddr* addr, uint* length); | |
80 virtual void getpeername(struct sockaddr* addr, uint* length); | |
81 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" | |
82 // exception if the stream is not a socket. The default implementations always throw | |
83 // "unimplemented". | |
84 // | |
85 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't | |
86 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are | |
87 // ephemeral addresses for a single connection. | |
88 }; | |
89 | |
90 struct OneWayPipe { | |
91 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) | |
92 | |
93 Own<AsyncInputStream> in; | |
94 Own<AsyncOutputStream> out; | |
95 }; | |
96 | |
97 struct TwoWayPipe { | |
98 // A data pipe that supports sending in both directions. Each end's output sends data to the | |
99 // other end's input. (Typically backed by socketpair() system call.) | |
100 | |
101 Own<AsyncIoStream> ends[2]; | |
102 }; | |
103 | |
104 class ConnectionReceiver { | |
105 // Represents a server socket listening on a port. | |
106 | |
107 public: | |
108 virtual Promise<Own<AsyncIoStream>> accept() = 0; | |
109 // Accept the next incoming connection. | |
110 | |
111 virtual uint getPort() = 0; | |
112 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't | |
113 // specify a port when constructing the NetworkAddress -- one will have been assigned | |
114 // automatically. | |
115 | |
116 virtual void getsockopt(int level, int option, void* value, uint* length); | |
117 virtual void setsockopt(int level, int option, const void* value, uint length); | |
118 // Same as the methods of AsyncIoStream. | |
119 }; | |
120 | |
121 // ======================================================================================= | |
122 // Datagram I/O | |
123 | |
124 class AncillaryMessage { | |
125 // Represents an ancillary message (aka control message) received using the recvmsg() system | |
126 // call (or equivalent). Most apps will not use this. | |
127 | |
128 public: | |
129 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data); | |
130 AncillaryMessage() = default; | |
131 | |
132 inline int getLevel() const; | |
133 // Originating protocol / socket level. | |
134 | |
135 inline int getType() const; | |
136 // Protocol-specific message type. | |
137 | |
138 template <typename T> | |
139 inline Maybe<const T&> as(); | |
140 // Interpret the ancillary message as the given struct type. Most ancillary messages are some | |
141 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message | |
142 // is smaller than the struct -- this can happen if the message was truncated due to | |
143 // insufficient ancillary buffer space. | |
144 | |
145 template <typename T> | |
146 inline ArrayPtr<const T> asArray(); | |
147 // Interpret the ancillary message as an array of items. If the message size does not evenly | |
148 // divide into elements of type T, the remainder is discarded -- this can happen if the message | |
149 // was truncated due to insufficient ancillary buffer space. | |
150 | |
151 private: | |
152 int level; | |
153 int type; | |
154 ArrayPtr<const byte> data; | |
155 // Message data. In most cases you should use `as()` or `asArray()`. | |
156 }; | |
157 | |
158 class DatagramReceiver { | |
159 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's | |
160 // capacity in advance; if a received packet is larger than the capacity, it will be truncated. | |
161 | |
162 public: | |
163 virtual Promise<void> receive() = 0; | |
164 // Receive a new message, overwriting this object's content. | |
165 // | |
166 // receive() may reuse the same buffers for content and ancillary data with each call. | |
167 | |
168 template <typename T> | |
169 struct MaybeTruncated { | |
170 T value; | |
171 | |
172 bool isTruncated; | |
173 // True if the Receiver's capacity was insufficient to receive the value and therefore the | |
174 // value is truncated. | |
175 }; | |
176 | |
177 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0; | |
178 // Get the content of the datagram. | |
179 | |
180 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0; | |
181 // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr | |
182 // struct. Most apps don't need this. | |
183 // | |
184 // If the returned value is truncated, then the last message in the array may itself be | |
185 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will | |
186 // return fewer elements than expected. Truncation can also mean that additional messages were | |
187 // available but discarded. | |
188 | |
189 virtual NetworkAddress& getSource() = 0; | |
190 // Get the datagram sender's address. | |
191 | |
192 struct Capacity { | |
193 size_t content = 8192; | |
194 // How much space to allocate for the datagram content. If a datagram is received that is | |
195 // larger than this, it will be truncated, with no way to recover the tail. | |
196 | |
197 size_t ancillary = 0; | |
198 // How much space to allocate for ancillary messages. As with content, if the ancillary data | |
199 // is larger than this, it will be truncated. | |
200 }; | |
201 }; | |
202 | |
203 class DatagramPort { | |
204 public: | |
205 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0; | |
206 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces, | |
207 NetworkAddress& destination) = 0; | |
208 | |
209 virtual Own<DatagramReceiver> makeReceiver( | |
210 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; | |
211 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much | |
212 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. | |
213 | |
214 virtual uint getPort() = 0; | |
215 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't | |
216 // specify a port when constructing the NetworkAddress -- one will have been assigned | |
217 // automatically. | |
218 | |
219 virtual void getsockopt(int level, int option, void* value, uint* length); | |
220 virtual void setsockopt(int level, int option, const void* value, uint length); | |
221 // Same as the methods of AsyncIoStream. | |
222 }; | |
223 | |
224 // ======================================================================================= | |
225 // Networks | |
226 | |
227 class NetworkAddress { | |
228 // Represents a remote address to which the application can connect. | |
229 | |
230 public: | |
231 virtual Promise<Own<AsyncIoStream>> connect() = 0; | |
232 // Make a new connection to this address. | |
233 // | |
234 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. | |
235 | |
236 virtual Own<ConnectionReceiver> listen() = 0; | |
237 // Listen for incoming connections on this address. | |
238 // | |
239 // The address must be local. | |
240 | |
241 virtual Own<DatagramPort> bindDatagramPort(); | |
242 // Open this address as a datagram (e.g. UDP) port. | |
243 // | |
244 // The address must be local. | |
245 | |
246 virtual Own<NetworkAddress> clone() = 0; | |
247 // Returns an equivalent copy of this NetworkAddress. | |
248 | |
249 virtual String toString() = 0; | |
250 // Produce a human-readable string which hopefully can be passed to Network::parseAddress() | |
251 // to reproduce this address, although whether or not that works of course depends on the Network | |
252 // implementation. This should be called only to display the address to human users, who will | |
253 // hopefully know what they are able to do with it. | |
254 }; | |
255 | |
256 class Network { | |
257 // Factory for NetworkAddress instances, representing the network services offered by the | |
258 // operating system. | |
259 // | |
260 // This interface typically represents broad authority, and well-designed code should limit its | |
261 // use to high-level startup code and user interaction. Low-level APIs should accept | |
262 // NetworkAddress instances directly and work from there, if at all possible. | |
263 | |
264 public: | |
265 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0; | |
266 // Construct a network address from a user-provided string. The format of the address | |
267 // strings is not specified at the API level, and application code should make no assumptions | |
268 // about them. These strings should always be provided by humans, and said humans will know | |
269 // what format to use in their particular context. | |
270 // | |
271 // `portHint`, if provided, specifies the "standard" IP port number for the application-level | |
272 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a | |
273 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is | |
274 // omitted, then the returned address will only support listen() and bindDatagramPort() | |
275 // (not connect()), and an unused port will be chosen each time one of those methods is called. | |
276 | |
277 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0; | |
278 // Construct a network address from a legacy struct sockaddr. | |
279 }; | |
280 | |
281 // ======================================================================================= | |
282 // I/O Provider | |
283 | |
284 class AsyncIoProvider { | |
285 // Class which constructs asynchronous wrappers around the operating system's I/O facilities. | |
286 // | |
287 // Generally, the implementation of this interface must integrate closely with a particular | |
288 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide | |
289 // an AsyncIoProvider. | |
290 | |
291 public: | |
292 virtual OneWayPipe newOneWayPipe() = 0; | |
293 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with | |
294 // the pipe(2) system call). | |
295 | |
296 virtual TwoWayPipe newTwoWayPipe() = 0; | |
297 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with | |
298 // socketpair(2) system call). Data written to one end can be read from the other. | |
299 | |
300 virtual Network& getNetwork() = 0; | |
301 // Creates a new `Network` instance representing the networks exposed by the operating system. | |
302 // | |
303 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If | |
304 // you call this from low-level code, then you are preventing higher-level code from injecting an | |
305 // alternative implementation. Instead, if your code needs to use network functionality, it | |
306 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can | |
307 // chose what implementation to use. The system network is essentially a singleton. See: | |
308 // http://www.object-oriented-security.org/lets-argue/singletons | |
309 // | |
310 // Code that uses the system network should not make any assumptions about what kinds of | |
311 // addresses it will parse, as this could differ across platforms. String addresses should come | |
312 // strictly from the user, who will know how to write them correctly for their system. | |
313 // | |
314 // With that said, KJ currently supports the following string address formats: | |
315 // - IPv4: "1.2.3.4", "1.2.3.4:80" | |
316 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" | |
317 // - Local IP wildcard (covers both v4 and v6): "*", "*:80" | |
318 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" | |
319 // - Unix domain: "unix:/path/to/socket" | |
320 | |
321 struct PipeThread { | |
322 // A combination of a thread and a two-way pipe that communicates with that thread. | |
323 // | |
324 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore | |
325 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread | |
326 // arranges to exit when it detects disconnect, destruction should be clean. | |
327 | |
328 Own<Thread> thread; | |
329 Own<AsyncIoStream> pipe; | |
330 }; | |
331 | |
332 virtual PipeThread newPipeThread( | |
333 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0; | |
334 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate | |
335 // with it. One end of the pipe is passed to the thread's start function and the other end of | |
336 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will | |
337 // already have an active `EventLoop` when `startFunc` is called. | |
338 // | |
339 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too | |
340 // much at once but I'm not sure how to cleanly break it down. | |
341 | |
342 virtual Timer& getTimer() = 0; | |
343 // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- | |
344 // it only updates when the event loop polls for system events. This means that calling `now()` | |
345 // on this timer does not require a system call. | |
346 // | |
347 // This timer is not affected by changes to the system date. It is unspecified whether the timer | |
348 // continues to count while the system is suspended. | |
349 }; | |
350 | |
351 class LowLevelAsyncIoProvider { | |
352 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on | |
353 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface | |
354 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. | |
355 // | |
356 // On Unix, this interface can be used to import native file descriptors into the async framework. | |
357 // Different implementations of this interface might work on top of different event handling | |
358 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. | |
359 // | |
360 // On Windows, this interface can be used to import native HANDLEs into the async framework. | |
361 // Different implementations of this interface might work on top of different event handling | |
362 // primitives, such as I/O completion ports vs. completion routines. | |
363 // | |
364 // TODO(port): Actually implement Windows support. | |
365 | |
366 public: | |
367 // --------------------------------------------------------------------------- | |
368 // Unix-specific stuff | |
369 | |
370 enum Flags { | |
371 // Flags controlling how to wrap a file descriptor. | |
372 | |
373 TAKE_OWNERSHIP = 1 << 0, | |
374 // The returned object should own the file descriptor, automatically closing it when destroyed. | |
375 // The close-on-exec flag will be set on the descriptor if it is not already. | |
376 // | |
377 // If this flag is not used, then the file descriptor is not automatically closed and the | |
378 // close-on-exec flag is not modified. | |
379 | |
380 ALREADY_CLOEXEC = 1 << 1, | |
381 // Indicates that the close-on-exec flag is known already to be set, so need not be set again. | |
382 // Only relevant when combined with TAKE_OWNERSHIP. | |
383 // | |
384 // On Linux, all system calls which yield new file descriptors have flags or variants which | |
385 // set the close-on-exec flag immediately. Unfortunately, other OS's do not. | |
386 | |
387 ALREADY_NONBLOCK = 1 << 2 | |
388 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag | |
389 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode | |
390 // automatically. | |
391 // | |
392 // On Linux, all system calls which yield new file descriptors have flags or variants which | |
393 // enable non-blocking mode immediately. Unfortunately, other OS's do not. | |
394 }; | |
395 | |
396 virtual Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) = 0; | |
397 // Create an AsyncInputStream wrapping a file descriptor. | |
398 // | |
399 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
400 | |
401 virtual Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) = 0; | |
402 // Create an AsyncOutputStream wrapping a file descriptor. | |
403 // | |
404 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
405 | |
406 virtual Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) = 0; | |
407 // Create an AsyncIoStream wrapping a socket file descriptor. | |
408 // | |
409 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
410 | |
411 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) = 0; | |
412 // Create an AsyncIoStream wrapping a socket that is in the process of connecting. The returned | |
413 // promise should not resolve until connection has completed -- traditionally indicated by the | |
414 // descriptor becoming writable. | |
415 // | |
416 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
417 | |
418 virtual Own<ConnectionReceiver> wrapListenSocketFd(int fd, uint flags = 0) = 0; | |
419 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already | |
420 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. | |
421 // | |
422 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
423 | |
424 virtual Own<DatagramPort> wrapDatagramSocketFd(int fd, uint flags = 0); | |
425 | |
426 virtual Timer& getTimer() = 0; | |
427 // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- | |
428 // it only updates when the event loop polls for system events. This means that calling `now()` | |
429 // on this timer does not require a system call. | |
430 // | |
431 // This timer is not affected by changes to the system date. It is unspecified whether the timer | |
432 // continues to count while the system is suspended. | |
433 }; | |
434 | |
435 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); | |
436 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. | |
437 | |
438 struct AsyncIoContext { | |
439 Own<LowLevelAsyncIoProvider> lowLevelProvider; | |
440 Own<AsyncIoProvider> provider; | |
441 WaitScope& waitScope; | |
442 | |
443 UnixEventPort& unixEventPort; | |
444 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This | |
445 // field will go away at some point when we have a chance to improve these interfaces. | |
446 }; | |
447 | |
448 AsyncIoContext setupAsyncIo(); | |
449 // Convenience method which sets up the current thread with everything it needs to do async I/O. | |
450 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for | |
451 // doing I/O on the host system, so everything is ready for the thread to start making async calls | |
452 // and waiting on promises. | |
453 // | |
454 // You would typically call this in your main() loop or in the start function of a thread. | |
455 // Example: | |
456 // | |
457 // int main() { | |
458 // auto ioContext = kj::setupAsyncIo(); | |
459 // | |
460 // // Now we can call an async function. | |
461 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com"); | |
462 // | |
463 // // And we can wait for the promise to complete. Note that you can only use `wait()` | |
464 // // from the top level, not from inside a promise callback. | |
465 // String text = textPromise.wait(ioContext.waitScope); | |
466 // print(text); | |
467 // return 0; | |
468 // } | |
469 // | |
470 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In | |
471 // particular, note that after a fork(), an AsyncIoContext created in the parent process will | |
472 // not work correctly in the child, even if the parent ceases to use its copy. In particular | |
473 // note that this means that server processes which daemonize themselves at startup must wait | |
474 // until after daemonization to create an AsyncIoContext. | |
475 | |
476 // ======================================================================================= | |
477 // inline implementation details | |
478 | |
479 inline AncillaryMessage::AncillaryMessage( | |
480 int level, int type, ArrayPtr<const byte> data) | |
481 : level(level), type(type), data(data) {} | |
482 | |
483 inline int AncillaryMessage::getLevel() const { return level; } | |
484 inline int AncillaryMessage::getType() const { return type; } | |
485 | |
486 template <typename T> | |
487 inline Maybe<const T&> AncillaryMessage::as() { | |
488 if (data.size() >= sizeof(T)) { | |
489 return *reinterpret_cast<const T*>(data.begin()); | |
490 } else { | |
491 return nullptr; | |
492 } | |
493 } | |
494 | |
495 template <typename T> | |
496 inline ArrayPtr<const T> AncillaryMessage::asArray() { | |
497 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T)); | |
498 } | |
499 | |
500 } // namespace kj | |
501 | |
502 #endif // KJ_ASYNC_IO_H_ |