cannam@134
|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
|
cannam@134
|
2 // Licensed under the MIT License:
|
cannam@134
|
3 //
|
cannam@134
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
cannam@134
|
5 // of this software and associated documentation files (the "Software"), to deal
|
cannam@134
|
6 // in the Software without restriction, including without limitation the rights
|
cannam@134
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
cannam@134
|
8 // copies of the Software, and to permit persons to whom the Software is
|
cannam@134
|
9 // furnished to do so, subject to the following conditions:
|
cannam@134
|
10 //
|
cannam@134
|
11 // The above copyright notice and this permission notice shall be included in
|
cannam@134
|
12 // all copies or substantial portions of the Software.
|
cannam@134
|
13 //
|
cannam@134
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
cannam@134
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
cannam@134
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
cannam@134
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
cannam@134
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
cannam@134
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
cannam@134
|
20 // THE SOFTWARE.
|
cannam@134
|
21
|
cannam@134
|
22 #ifndef KJ_ASYNC_IO_H_
|
cannam@134
|
23 #define KJ_ASYNC_IO_H_
|
cannam@134
|
24
|
cannam@134
|
25 #if defined(__GNUC__) && !KJ_HEADER_WARNINGS
|
cannam@134
|
26 #pragma GCC system_header
|
cannam@134
|
27 #endif
|
cannam@134
|
28
|
cannam@134
|
29 #include "async.h"
|
cannam@134
|
30 #include "function.h"
|
cannam@134
|
31 #include "thread.h"
|
cannam@134
|
32 #include "time.h"
|
cannam@134
|
33
|
cannam@134
|
34 struct sockaddr;
|
cannam@134
|
35
|
cannam@134
|
36 namespace kj {
|
cannam@134
|
37
|
cannam@134
|
38 class UnixEventPort;
|
cannam@134
|
39 class NetworkAddress;
|
cannam@134
|
40
|
cannam@134
|
41 // =======================================================================================
|
cannam@134
|
42 // Streaming I/O
|
cannam@134
|
43
|
cannam@134
|
44 class AsyncInputStream {
|
cannam@134
|
45 // Asynchronous equivalent of InputStream (from io.h).
|
cannam@134
|
46
|
cannam@134
|
47 public:
|
cannam@134
|
48 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) = 0;
|
cannam@134
|
49 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
|
cannam@134
|
50
|
cannam@134
|
51 Promise<void> read(void* buffer, size_t bytes);
|
cannam@134
|
52 };
|
cannam@134
|
53
|
cannam@134
|
54 class AsyncOutputStream {
|
cannam@134
|
55 // Asynchronous equivalent of OutputStream (from io.h).
|
cannam@134
|
56
|
cannam@134
|
57 public:
|
cannam@134
|
58 virtual Promise<void> write(const void* buffer, size_t size) = 0;
|
cannam@134
|
59 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0;
|
cannam@134
|
60 };
|
cannam@134
|
61
|
cannam@134
|
62 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
|
cannam@134
|
63 // A combination input and output stream.
|
cannam@134
|
64
|
cannam@134
|
65 public:
|
cannam@134
|
66 virtual void shutdownWrite() = 0;
|
cannam@134
|
67 // Cleanly shut down just the write end of the stream, while keeping the read end open.
|
cannam@134
|
68
|
cannam@134
|
69 virtual void abortRead() {}
|
cannam@134
|
70 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only
|
cannam@134
|
71 // be called when an error has occurred.
|
cannam@134
|
72
|
cannam@134
|
73 virtual void getsockopt(int level, int option, void* value, uint* length);
|
cannam@134
|
74 virtual void setsockopt(int level, int option, const void* value, uint length);
|
cannam@134
|
75 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
|
cannam@134
|
76 // if the stream is not a socket or the option is not appropriate for the socket type. The
|
cannam@134
|
77 // default implementations always throw "unimplemented".
|
cannam@134
|
78
|
cannam@134
|
79 virtual void getsockname(struct sockaddr* addr, uint* length);
|
cannam@134
|
80 virtual void getpeername(struct sockaddr* addr, uint* length);
|
cannam@134
|
81 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented"
|
cannam@134
|
82 // exception if the stream is not a socket. The default implementations always throw
|
cannam@134
|
83 // "unimplemented".
|
cannam@134
|
84 //
|
cannam@134
|
85 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't
|
cannam@134
|
86 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are
|
cannam@134
|
87 // ephemeral addresses for a single connection.
|
cannam@134
|
88 };
|
cannam@134
|
89
|
cannam@134
|
90 struct OneWayPipe {
|
cannam@134
|
91 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.)
|
cannam@134
|
92
|
cannam@134
|
93 Own<AsyncInputStream> in;
|
cannam@134
|
94 Own<AsyncOutputStream> out;
|
cannam@134
|
95 };
|
cannam@134
|
96
|
cannam@134
|
97 struct TwoWayPipe {
|
cannam@134
|
98 // A data pipe that supports sending in both directions. Each end's output sends data to the
|
cannam@134
|
99 // other end's input. (Typically backed by socketpair() system call.)
|
cannam@134
|
100
|
cannam@134
|
101 Own<AsyncIoStream> ends[2];
|
cannam@134
|
102 };
|
cannam@134
|
103
|
cannam@134
|
104 class ConnectionReceiver {
|
cannam@134
|
105 // Represents a server socket listening on a port.
|
cannam@134
|
106
|
cannam@134
|
107 public:
|
cannam@134
|
108 virtual Promise<Own<AsyncIoStream>> accept() = 0;
|
cannam@134
|
109 // Accept the next incoming connection.
|
cannam@134
|
110
|
cannam@134
|
111 virtual uint getPort() = 0;
|
cannam@134
|
112 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
|
cannam@134
|
113 // specify a port when constructing the NetworkAddress -- one will have been assigned
|
cannam@134
|
114 // automatically.
|
cannam@134
|
115
|
cannam@134
|
116 virtual void getsockopt(int level, int option, void* value, uint* length);
|
cannam@134
|
117 virtual void setsockopt(int level, int option, const void* value, uint length);
|
cannam@134
|
118 // Same as the methods of AsyncIoStream.
|
cannam@134
|
119 };
|
cannam@134
|
120
|
cannam@134
|
121 // =======================================================================================
|
cannam@134
|
122 // Datagram I/O
|
cannam@134
|
123
|
cannam@134
|
124 class AncillaryMessage {
|
cannam@134
|
125 // Represents an ancillary message (aka control message) received using the recvmsg() system
|
cannam@134
|
126 // call (or equivalent). Most apps will not use this.
|
cannam@134
|
127
|
cannam@134
|
128 public:
|
cannam@134
|
129 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
|
cannam@134
|
130 AncillaryMessage() = default;
|
cannam@134
|
131
|
cannam@134
|
132 inline int getLevel() const;
|
cannam@134
|
133 // Originating protocol / socket level.
|
cannam@134
|
134
|
cannam@134
|
135 inline int getType() const;
|
cannam@134
|
136 // Protocol-specific message type.
|
cannam@134
|
137
|
cannam@134
|
138 template <typename T>
|
cannam@134
|
139 inline Maybe<const T&> as();
|
cannam@134
|
140 // Interpret the ancillary message as the given struct type. Most ancillary messages are some
|
cannam@134
|
141 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message
|
cannam@134
|
142 // is smaller than the struct -- this can happen if the message was truncated due to
|
cannam@134
|
143 // insufficient ancillary buffer space.
|
cannam@134
|
144
|
cannam@134
|
145 template <typename T>
|
cannam@134
|
146 inline ArrayPtr<const T> asArray();
|
cannam@134
|
147 // Interpret the ancillary message as an array of items. If the message size does not evenly
|
cannam@134
|
148 // divide into elements of type T, the remainder is discarded -- this can happen if the message
|
cannam@134
|
149 // was truncated due to insufficient ancillary buffer space.
|
cannam@134
|
150
|
cannam@134
|
151 private:
|
cannam@134
|
152 int level;
|
cannam@134
|
153 int type;
|
cannam@134
|
154 ArrayPtr<const byte> data;
|
cannam@134
|
155 // Message data. In most cases you should use `as()` or `asArray()`.
|
cannam@134
|
156 };
|
cannam@134
|
157
|
cannam@134
|
158 class DatagramReceiver {
|
cannam@134
|
159 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
|
cannam@134
|
160 // capacity in advance; if a received packet is larger than the capacity, it will be truncated.
|
cannam@134
|
161
|
cannam@134
|
162 public:
|
cannam@134
|
163 virtual Promise<void> receive() = 0;
|
cannam@134
|
164 // Receive a new message, overwriting this object's content.
|
cannam@134
|
165 //
|
cannam@134
|
166 // receive() may reuse the same buffers for content and ancillary data with each call.
|
cannam@134
|
167
|
cannam@134
|
168 template <typename T>
|
cannam@134
|
169 struct MaybeTruncated {
|
cannam@134
|
170 T value;
|
cannam@134
|
171
|
cannam@134
|
172 bool isTruncated;
|
cannam@134
|
173 // True if the Receiver's capacity was insufficient to receive the value and therefore the
|
cannam@134
|
174 // value is truncated.
|
cannam@134
|
175 };
|
cannam@134
|
176
|
cannam@134
|
177 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
|
cannam@134
|
178 // Get the content of the datagram.
|
cannam@134
|
179
|
cannam@134
|
180 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
|
cannam@134
|
181 // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr
|
cannam@134
|
182 // struct. Most apps don't need this.
|
cannam@134
|
183 //
|
cannam@134
|
184 // If the returned value is truncated, then the last message in the array may itself be
|
cannam@134
|
185 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
|
cannam@134
|
186 // return fewer elements than expected. Truncation can also mean that additional messages were
|
cannam@134
|
187 // available but discarded.
|
cannam@134
|
188
|
cannam@134
|
189 virtual NetworkAddress& getSource() = 0;
|
cannam@134
|
190 // Get the datagram sender's address.
|
cannam@134
|
191
|
cannam@134
|
192 struct Capacity {
|
cannam@134
|
193 size_t content = 8192;
|
cannam@134
|
194 // How much space to allocate for the datagram content. If a datagram is received that is
|
cannam@134
|
195 // larger than this, it will be truncated, with no way to recover the tail.
|
cannam@134
|
196
|
cannam@134
|
197 size_t ancillary = 0;
|
cannam@134
|
198 // How much space to allocate for ancillary messages. As with content, if the ancillary data
|
cannam@134
|
199 // is larger than this, it will be truncated.
|
cannam@134
|
200 };
|
cannam@134
|
201 };
|
cannam@134
|
202
|
cannam@134
|
203 class DatagramPort {
|
cannam@134
|
204 public:
|
cannam@134
|
205 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
|
cannam@134
|
206 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
|
cannam@134
|
207 NetworkAddress& destination) = 0;
|
cannam@134
|
208
|
cannam@134
|
209 virtual Own<DatagramReceiver> makeReceiver(
|
cannam@134
|
210 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
|
cannam@134
|
211 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
|
cannam@134
|
212 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.
|
cannam@134
|
213
|
cannam@134
|
214 virtual uint getPort() = 0;
|
cannam@134
|
215 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
|
cannam@134
|
216 // specify a port when constructing the NetworkAddress -- one will have been assigned
|
cannam@134
|
217 // automatically.
|
cannam@134
|
218
|
cannam@134
|
219 virtual void getsockopt(int level, int option, void* value, uint* length);
|
cannam@134
|
220 virtual void setsockopt(int level, int option, const void* value, uint length);
|
cannam@134
|
221 // Same as the methods of AsyncIoStream.
|
cannam@134
|
222 };
|
cannam@134
|
223
|
cannam@134
|
224 // =======================================================================================
|
cannam@134
|
225 // Networks
|
cannam@134
|
226
|
cannam@134
|
227 class NetworkAddress {
|
cannam@134
|
228 // Represents a remote address to which the application can connect.
|
cannam@134
|
229
|
cannam@134
|
230 public:
|
cannam@134
|
231 virtual Promise<Own<AsyncIoStream>> connect() = 0;
|
cannam@134
|
232 // Make a new connection to this address.
|
cannam@134
|
233 //
|
cannam@134
|
234 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number.
|
cannam@134
|
235
|
cannam@134
|
236 virtual Own<ConnectionReceiver> listen() = 0;
|
cannam@134
|
237 // Listen for incoming connections on this address.
|
cannam@134
|
238 //
|
cannam@134
|
239 // The address must be local.
|
cannam@134
|
240
|
cannam@134
|
241 virtual Own<DatagramPort> bindDatagramPort();
|
cannam@134
|
242 // Open this address as a datagram (e.g. UDP) port.
|
cannam@134
|
243 //
|
cannam@134
|
244 // The address must be local.
|
cannam@134
|
245
|
cannam@134
|
246 virtual Own<NetworkAddress> clone() = 0;
|
cannam@134
|
247 // Returns an equivalent copy of this NetworkAddress.
|
cannam@134
|
248
|
cannam@134
|
249 virtual String toString() = 0;
|
cannam@134
|
250 // Produce a human-readable string which hopefully can be passed to Network::parseAddress()
|
cannam@134
|
251 // to reproduce this address, although whether or not that works of course depends on the Network
|
cannam@134
|
252 // implementation. This should be called only to display the address to human users, who will
|
cannam@134
|
253 // hopefully know what they are able to do with it.
|
cannam@134
|
254 };
|
cannam@134
|
255
|
cannam@134
|
256 class Network {
|
cannam@134
|
257 // Factory for NetworkAddress instances, representing the network services offered by the
|
cannam@134
|
258 // operating system.
|
cannam@134
|
259 //
|
cannam@134
|
260 // This interface typically represents broad authority, and well-designed code should limit its
|
cannam@134
|
261 // use to high-level startup code and user interaction. Low-level APIs should accept
|
cannam@134
|
262 // NetworkAddress instances directly and work from there, if at all possible.
|
cannam@134
|
263
|
cannam@134
|
264 public:
|
cannam@134
|
265 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0;
|
cannam@134
|
266 // Construct a network address from a user-provided string. The format of the address
|
cannam@134
|
267 // strings is not specified at the API level, and application code should make no assumptions
|
cannam@134
|
268 // about them. These strings should always be provided by humans, and said humans will know
|
cannam@134
|
269 // what format to use in their particular context.
|
cannam@134
|
270 //
|
cannam@134
|
271 // `portHint`, if provided, specifies the "standard" IP port number for the application-level
|
cannam@134
|
272 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a
|
cannam@134
|
273 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is
|
cannam@134
|
274 // omitted, then the returned address will only support listen() and bindDatagramPort()
|
cannam@134
|
275 // (not connect()), and an unused port will be chosen each time one of those methods is called.
|
cannam@134
|
276
|
cannam@134
|
277 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
|
cannam@134
|
278 // Construct a network address from a legacy struct sockaddr.
|
cannam@134
|
279 };
|
cannam@134
|
280
|
cannam@134
|
281 // =======================================================================================
|
cannam@134
|
282 // I/O Provider
|
cannam@134
|
283
|
cannam@134
|
284 class AsyncIoProvider {
|
cannam@134
|
285 // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
|
cannam@134
|
286 //
|
cannam@134
|
287 // Generally, the implementation of this interface must integrate closely with a particular
|
cannam@134
|
288 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide
|
cannam@134
|
289 // an AsyncIoProvider.
|
cannam@134
|
290
|
cannam@134
|
291 public:
|
cannam@134
|
292 virtual OneWayPipe newOneWayPipe() = 0;
|
cannam@134
|
293 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with
|
cannam@134
|
294 // the pipe(2) system call).
|
cannam@134
|
295
|
cannam@134
|
296 virtual TwoWayPipe newTwoWayPipe() = 0;
|
cannam@134
|
297 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with
|
cannam@134
|
298 // socketpair(2) system call). Data written to one end can be read from the other.
|
cannam@134
|
299
|
cannam@134
|
300 virtual Network& getNetwork() = 0;
|
cannam@134
|
301 // Creates a new `Network` instance representing the networks exposed by the operating system.
|
cannam@134
|
302 //
|
cannam@134
|
303 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If
|
cannam@134
|
304 // you call this from low-level code, then you are preventing higher-level code from injecting an
|
cannam@134
|
305 // alternative implementation. Instead, if your code needs to use network functionality, it
|
cannam@134
|
306 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can
|
cannam@134
|
307 // chose what implementation to use. The system network is essentially a singleton. See:
|
cannam@134
|
308 // http://www.object-oriented-security.org/lets-argue/singletons
|
cannam@134
|
309 //
|
cannam@134
|
310 // Code that uses the system network should not make any assumptions about what kinds of
|
cannam@134
|
311 // addresses it will parse, as this could differ across platforms. String addresses should come
|
cannam@134
|
312 // strictly from the user, who will know how to write them correctly for their system.
|
cannam@134
|
313 //
|
cannam@134
|
314 // With that said, KJ currently supports the following string address formats:
|
cannam@134
|
315 // - IPv4: "1.2.3.4", "1.2.3.4:80"
|
cannam@134
|
316 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
|
cannam@134
|
317 // - Local IP wildcard (covers both v4 and v6): "*", "*:80"
|
cannam@134
|
318 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http"
|
cannam@134
|
319 // - Unix domain: "unix:/path/to/socket"
|
cannam@134
|
320
|
cannam@134
|
321 struct PipeThread {
|
cannam@134
|
322 // A combination of a thread and a two-way pipe that communicates with that thread.
|
cannam@134
|
323 //
|
cannam@134
|
324 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore
|
cannam@134
|
325 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread
|
cannam@134
|
326 // arranges to exit when it detects disconnect, destruction should be clean.
|
cannam@134
|
327
|
cannam@134
|
328 Own<Thread> thread;
|
cannam@134
|
329 Own<AsyncIoStream> pipe;
|
cannam@134
|
330 };
|
cannam@134
|
331
|
cannam@134
|
332 virtual PipeThread newPipeThread(
|
cannam@134
|
333 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0;
|
cannam@134
|
334 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate
|
cannam@134
|
335 // with it. One end of the pipe is passed to the thread's start function and the other end of
|
cannam@134
|
336 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will
|
cannam@134
|
337 // already have an active `EventLoop` when `startFunc` is called.
|
cannam@134
|
338 //
|
cannam@134
|
339 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too
|
cannam@134
|
340 // much at once but I'm not sure how to cleanly break it down.
|
cannam@134
|
341
|
cannam@134
|
342 virtual Timer& getTimer() = 0;
|
cannam@134
|
343 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
|
cannam@134
|
344 // it only updates when the event loop polls for system events. This means that calling `now()`
|
cannam@134
|
345 // on this timer does not require a system call.
|
cannam@134
|
346 //
|
cannam@134
|
347 // This timer is not affected by changes to the system date. It is unspecified whether the timer
|
cannam@134
|
348 // continues to count while the system is suspended.
|
cannam@134
|
349 };
|
cannam@134
|
350
|
cannam@134
|
351 class LowLevelAsyncIoProvider {
|
cannam@134
|
352 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on
|
cannam@134
|
353 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface
|
cannam@134
|
354 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection.
|
cannam@134
|
355 //
|
cannam@134
|
356 // On Unix, this interface can be used to import native file descriptors into the async framework.
|
cannam@134
|
357 // Different implementations of this interface might work on top of different event handling
|
cannam@134
|
358 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library.
|
cannam@134
|
359 //
|
cannam@134
|
360 // On Windows, this interface can be used to import native HANDLEs into the async framework.
|
cannam@134
|
361 // Different implementations of this interface might work on top of different event handling
|
cannam@134
|
362 // primitives, such as I/O completion ports vs. completion routines.
|
cannam@134
|
363 //
|
cannam@134
|
364 // TODO(port): Actually implement Windows support.
|
cannam@134
|
365
|
cannam@134
|
366 public:
|
cannam@134
|
367 // ---------------------------------------------------------------------------
|
cannam@134
|
368 // Unix-specific stuff
|
cannam@134
|
369
|
cannam@134
|
370 enum Flags {
|
cannam@134
|
371 // Flags controlling how to wrap a file descriptor.
|
cannam@134
|
372
|
cannam@134
|
373 TAKE_OWNERSHIP = 1 << 0,
|
cannam@134
|
374 // The returned object should own the file descriptor, automatically closing it when destroyed.
|
cannam@134
|
375 // The close-on-exec flag will be set on the descriptor if it is not already.
|
cannam@134
|
376 //
|
cannam@134
|
377 // If this flag is not used, then the file descriptor is not automatically closed and the
|
cannam@134
|
378 // close-on-exec flag is not modified.
|
cannam@134
|
379
|
cannam@134
|
380 ALREADY_CLOEXEC = 1 << 1,
|
cannam@134
|
381 // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
|
cannam@134
|
382 // Only relevant when combined with TAKE_OWNERSHIP.
|
cannam@134
|
383 //
|
cannam@134
|
384 // On Linux, all system calls which yield new file descriptors have flags or variants which
|
cannam@134
|
385 // set the close-on-exec flag immediately. Unfortunately, other OS's do not.
|
cannam@134
|
386
|
cannam@134
|
387 ALREADY_NONBLOCK = 1 << 2
|
cannam@134
|
388 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag
|
cannam@134
|
389 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode
|
cannam@134
|
390 // automatically.
|
cannam@134
|
391 //
|
cannam@134
|
392 // On Linux, all system calls which yield new file descriptors have flags or variants which
|
cannam@134
|
393 // enable non-blocking mode immediately. Unfortunately, other OS's do not.
|
cannam@134
|
394 };
|
cannam@134
|
395
|
cannam@134
|
396 virtual Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) = 0;
|
cannam@134
|
397 // Create an AsyncInputStream wrapping a file descriptor.
|
cannam@134
|
398 //
|
cannam@134
|
399 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
cannam@134
|
400
|
cannam@134
|
401 virtual Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) = 0;
|
cannam@134
|
402 // Create an AsyncOutputStream wrapping a file descriptor.
|
cannam@134
|
403 //
|
cannam@134
|
404 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
cannam@134
|
405
|
cannam@134
|
406 virtual Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) = 0;
|
cannam@134
|
407 // Create an AsyncIoStream wrapping a socket file descriptor.
|
cannam@134
|
408 //
|
cannam@134
|
409 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
cannam@134
|
410
|
cannam@134
|
411 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) = 0;
|
cannam@134
|
412 // Create an AsyncIoStream wrapping a socket that is in the process of connecting. The returned
|
cannam@134
|
413 // promise should not resolve until connection has completed -- traditionally indicated by the
|
cannam@134
|
414 // descriptor becoming writable.
|
cannam@134
|
415 //
|
cannam@134
|
416 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
cannam@134
|
417
|
cannam@134
|
418 virtual Own<ConnectionReceiver> wrapListenSocketFd(int fd, uint flags = 0) = 0;
|
cannam@134
|
419 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
|
cannam@134
|
420 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
|
cannam@134
|
421 //
|
cannam@134
|
422 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
cannam@134
|
423
|
cannam@134
|
424 virtual Own<DatagramPort> wrapDatagramSocketFd(int fd, uint flags = 0);
|
cannam@134
|
425
|
cannam@134
|
426 virtual Timer& getTimer() = 0;
|
cannam@134
|
427 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
|
cannam@134
|
428 // it only updates when the event loop polls for system events. This means that calling `now()`
|
cannam@134
|
429 // on this timer does not require a system call.
|
cannam@134
|
430 //
|
cannam@134
|
431 // This timer is not affected by changes to the system date. It is unspecified whether the timer
|
cannam@134
|
432 // continues to count while the system is suspended.
|
cannam@134
|
433 };
|
cannam@134
|
434
|
cannam@134
|
435 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
|
cannam@134
|
436 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`.
|
cannam@134
|
437
|
cannam@134
|
438 struct AsyncIoContext {
|
cannam@134
|
439 Own<LowLevelAsyncIoProvider> lowLevelProvider;
|
cannam@134
|
440 Own<AsyncIoProvider> provider;
|
cannam@134
|
441 WaitScope& waitScope;
|
cannam@134
|
442
|
cannam@134
|
443 UnixEventPort& unixEventPort;
|
cannam@134
|
444 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
|
cannam@134
|
445 // field will go away at some point when we have a chance to improve these interfaces.
|
cannam@134
|
446 };
|
cannam@134
|
447
|
cannam@134
|
448 AsyncIoContext setupAsyncIo();
|
cannam@134
|
449 // Convenience method which sets up the current thread with everything it needs to do async I/O.
|
cannam@134
|
450 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for
|
cannam@134
|
451 // doing I/O on the host system, so everything is ready for the thread to start making async calls
|
cannam@134
|
452 // and waiting on promises.
|
cannam@134
|
453 //
|
cannam@134
|
454 // You would typically call this in your main() loop or in the start function of a thread.
|
cannam@134
|
455 // Example:
|
cannam@134
|
456 //
|
cannam@134
|
457 // int main() {
|
cannam@134
|
458 // auto ioContext = kj::setupAsyncIo();
|
cannam@134
|
459 //
|
cannam@134
|
460 // // Now we can call an async function.
|
cannam@134
|
461 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com");
|
cannam@134
|
462 //
|
cannam@134
|
463 // // And we can wait for the promise to complete. Note that you can only use `wait()`
|
cannam@134
|
464 // // from the top level, not from inside a promise callback.
|
cannam@134
|
465 // String text = textPromise.wait(ioContext.waitScope);
|
cannam@134
|
466 // print(text);
|
cannam@134
|
467 // return 0;
|
cannam@134
|
468 // }
|
cannam@134
|
469 //
|
cannam@134
|
470 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In
|
cannam@134
|
471 // particular, note that after a fork(), an AsyncIoContext created in the parent process will
|
cannam@134
|
472 // not work correctly in the child, even if the parent ceases to use its copy. In particular
|
cannam@134
|
473 // note that this means that server processes which daemonize themselves at startup must wait
|
cannam@134
|
474 // until after daemonization to create an AsyncIoContext.
|
cannam@134
|
475
|
cannam@134
|
476 // =======================================================================================
|
cannam@134
|
477 // inline implementation details
|
cannam@134
|
478
|
cannam@134
|
479 inline AncillaryMessage::AncillaryMessage(
|
cannam@134
|
480 int level, int type, ArrayPtr<const byte> data)
|
cannam@134
|
481 : level(level), type(type), data(data) {}
|
cannam@134
|
482
|
cannam@134
|
483 inline int AncillaryMessage::getLevel() const { return level; }
|
cannam@134
|
484 inline int AncillaryMessage::getType() const { return type; }
|
cannam@134
|
485
|
cannam@134
|
486 template <typename T>
|
cannam@134
|
487 inline Maybe<const T&> AncillaryMessage::as() {
|
cannam@134
|
488 if (data.size() >= sizeof(T)) {
|
cannam@134
|
489 return *reinterpret_cast<const T*>(data.begin());
|
cannam@134
|
490 } else {
|
cannam@134
|
491 return nullptr;
|
cannam@134
|
492 }
|
cannam@134
|
493 }
|
cannam@134
|
494
|
cannam@134
|
495 template <typename T>
|
cannam@134
|
496 inline ArrayPtr<const T> AncillaryMessage::asArray() {
|
cannam@134
|
497 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
|
cannam@134
|
498 }
|
cannam@134
|
499
|
cannam@134
|
500 } // namespace kj
|
cannam@134
|
501
|
cannam@134
|
502 #endif // KJ_ASYNC_IO_H_
|