cannam@135: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors cannam@135: // Licensed under the MIT License: cannam@135: // cannam@135: // Permission is hereby granted, free of charge, to any person obtaining a copy cannam@135: // of this software and associated documentation files (the "Software"), to deal cannam@135: // in the Software without restriction, including without limitation the rights cannam@135: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell cannam@135: // copies of the Software, and to permit persons to whom the Software is cannam@135: // furnished to do so, subject to the following conditions: cannam@135: // cannam@135: // The above copyright notice and this permission notice shall be included in cannam@135: // all copies or substantial portions of the Software. cannam@135: // cannam@135: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR cannam@135: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, cannam@135: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE cannam@135: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER cannam@135: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, cannam@135: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN cannam@135: // THE SOFTWARE. cannam@135: cannam@135: #ifndef KJ_ASYNC_H_ cannam@135: #define KJ_ASYNC_H_ cannam@135: cannam@135: #if defined(__GNUC__) && !KJ_HEADER_WARNINGS cannam@135: #pragma GCC system_header cannam@135: #endif cannam@135: cannam@135: #include "async-prelude.h" cannam@135: #include "exception.h" cannam@135: #include "refcount.h" cannam@135: cannam@135: namespace kj { cannam@135: cannam@135: class EventLoop; cannam@135: class WaitScope; cannam@135: cannam@135: template cannam@135: class Promise; cannam@135: template cannam@135: class ForkedPromise; cannam@135: template cannam@135: class PromiseFulfiller; cannam@135: template cannam@135: struct PromiseFulfillerPair; cannam@135: cannam@135: template cannam@135: using PromiseForResult = Promise<_::JoinPromises<_::ReturnType>>; cannam@135: // Evaluates to the type of Promise for the result of calling functor type Func with parameter type cannam@135: // T. If T is void, then the promise is for the result of calling Func with no arguments. If cannam@135: // Func itself returns a promise, the promises are joined, so you never get Promise>. cannam@135: cannam@135: // ======================================================================================= cannam@135: // Promises cannam@135: cannam@135: template cannam@135: class Promise: protected _::PromiseBase { cannam@135: // The basic primitive of asynchronous computation in KJ. Similar to "futures", but designed cannam@135: // specifically for event loop concurrency. Similar to E promises and JavaScript Promises/A. cannam@135: // cannam@135: // A Promise represents a promise to produce a value of type T some time in the future. Once cannam@135: // that value has been produced, the promise is "fulfilled". Alternatively, a promise can be cannam@135: // "broken", with an Exception describing what went wrong. You may implicitly convert a value of cannam@135: // type T to an already-fulfilled Promise. You may implicitly convert the constant cannam@135: // `kj::READY_NOW` to an already-fulfilled Promise. You may also implicitly convert a cannam@135: // `kj::Exception` to an already-broken promise of any type. cannam@135: // cannam@135: // Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed cannam@135: // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations cannam@135: // meant to fulfill the promise will be canceled if possible. All methods of `Promise` (unless cannam@135: // otherwise noted) actually consume the promise in the sense of move semantics. (Arguably they cannam@135: // should be rvalue-qualified, but at the time this interface was created compilers didn't widely cannam@135: // support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().) If cannam@135: // you want to use one Promise in two different places, you must fork it with `fork()`. cannam@135: // cannam@135: // To use the result of a Promise, you must call `then()` and supply a callback function to cannam@135: // call with the result. `then()` returns another promise, for the result of the callback. cannam@135: // Any time that this would result in Promise>, the promises are collapsed into a cannam@135: // simple Promise that first waits for the outer promise, then the inner. Example: cannam@135: // cannam@135: // // Open a remote file, read the content, and then count the cannam@135: // // number of lines of text. cannam@135: // // Note that none of the calls here block. `file`, `content` cannam@135: // // and `lineCount` are all initialized immediately before any cannam@135: // // asynchronous operations occur. The lambda callbacks are cannam@135: // // called later. cannam@135: // Promise> file = openFtp("ftp://host/foo/bar"); cannam@135: // Promise content = file.then( cannam@135: // [](Own file) -> Promise { cannam@135: // return file.readAll(); cannam@135: // }); cannam@135: // Promise lineCount = content.then( cannam@135: // [](String text) -> int { cannam@135: // uint count = 0; cannam@135: // for (char c: text) count += (c == '\n'); cannam@135: // return count; cannam@135: // }); cannam@135: // cannam@135: // For `then()` to work, the current thread must have an active `EventLoop`. Each callback cannam@135: // is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current cannam@135: // thread's event loop, you do not need to worry about two callbacks running at the same time. cannam@135: // You will need to set up at least one `EventLoop` at the top level of your program before you cannam@135: // can use promises. cannam@135: // cannam@135: // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`. cannam@135: // cannam@135: // Systems using promises should consider supporting the concept of "pipelining". Pipelining cannam@135: // means allowing a caller to start issuing method calls against a promised object before the cannam@135: // promise has actually been fulfilled. This is particularly useful if the promise is for a cannam@135: // remote object living across a network, as this can avoid round trips when chaining a series cannam@135: // of calls. It is suggested that any class T which supports pipelining implement a subclass of cannam@135: // Promise which adds "eventual send" methods -- methods which, when called, say "please cannam@135: // invoke the corresponding method on the promised value once it is available". These methods cannam@135: // should in turn return promises for the eventual results of said invocations. Cap'n Proto, cannam@135: // for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see cannam@135: // `capnp/capability.h`. cannam@135: // cannam@135: // KJ Promises are based on E promises: cannam@135: // http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises cannam@135: // cannam@135: // KJ Promises are also inspired in part by the evolving standards for JavaScript/ECMAScript cannam@135: // promises, which are themselves influenced by E promises: cannam@135: // http://promisesaplus.com/ cannam@135: // https://github.com/domenic/promises-unwrapping cannam@135: cannam@135: public: cannam@135: Promise(_::FixVoid value); cannam@135: // Construct an already-fulfilled Promise from a value of type T. For non-void promises, the cannam@135: // parameter type is simply T. So, e.g., in a function that returns `Promise`, you can cannam@135: // say `return 123;` to return a promise that is already fulfilled to 123. cannam@135: // cannam@135: // For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`. cannam@135: cannam@135: Promise(kj::Exception&& e); cannam@135: // Construct an already-broken Promise. cannam@135: cannam@135: inline Promise(decltype(nullptr)) {} cannam@135: cannam@135: template cannam@135: PromiseForResult then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException()) cannam@135: KJ_WARN_UNUSED_RESULT; cannam@135: // Register a continuation function to be executed when the promise completes. The continuation cannam@135: // (`func`) takes the promised value (an rvalue of type `T`) as its parameter. The continuation cannam@135: // may return a new value; `then()` itself returns a promise for the continuation's eventual cannam@135: // result. If the continuation itself returns a `Promise`, then `then()` shall also return cannam@135: // a `Promise` which first waits for the original promise, then executes the continuation, cannam@135: // then waits for the inner promise (i.e. it automatically "unwraps" the promise). cannam@135: // cannam@135: // In all cases, `then()` returns immediately. The continuation is executed later. The cannam@135: // continuation is always executed on the same EventLoop (and, therefore, the same thread) which cannam@135: // called `then()`, therefore no synchronization is necessary on state shared by the continuation cannam@135: // and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws cannam@135: // an exception. cannam@135: // cannam@135: // You may also specify an error handler continuation as the second parameter. `errorHandler` cannam@135: // must be a functor taking a parameter of type `kj::Exception&&`. It must return the same cannam@135: // type as `func` returns (except when `func` returns `Promise`, in which case `errorHandler` cannam@135: // may return either `Promise` or just `U`). The default error handler simply propagates the cannam@135: // exception to the returned promise. cannam@135: // cannam@135: // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise cannam@135: // is broken. When compiled with -fno-exceptions, the framework will still detect when a cannam@135: // recoverable exception was thrown inside of a continuation and will consider the promise cannam@135: // broken even though a (presumably garbage) result was returned. cannam@135: // cannam@135: // If the returned promise is destroyed before the callback runs, the callback will be canceled cannam@135: // (it will never run). cannam@135: // cannam@135: // Note that `then()` -- like all other Promise methods -- consumes the promise on which it is cannam@135: // called, in the sense of move semantics. After returning, the original promise is no longer cannam@135: // valid, but `then()` returns a new promise. cannam@135: // cannam@135: // *Advanced implementation tips:* Most users will never need to worry about the below, but cannam@135: // it is good to be aware of. cannam@135: // cannam@135: // As an optimization, if the callback function `func` does _not_ return another promise, then cannam@135: // execution of `func` itself may be delayed until its result is known to be needed. The cannam@135: // expectation here is that `func` is just doing some transformation on the results, not cannam@135: // scheduling any other actions, therefore the system doesn't need to be proactive about cannam@135: // evaluating it. This way, a chain of trivial then() transformations can be executed all at cannam@135: // once without repeatedly re-scheduling through the event loop. Use the `eagerlyEvaluate()` cannam@135: // method to suppress this behavior. cannam@135: // cannam@135: // On the other hand, if `func` _does_ return another promise, then the system evaluates `func` cannam@135: // as soon as possible, because the promise it returns might be for a newly-scheduled cannam@135: // long-running asynchronous task. cannam@135: // cannam@135: // As another optimization, when a callback function registered with `then()` is actually cannam@135: // scheduled, it is scheduled to occur immediately, preempting other work in the event queue. cannam@135: // This allows a long chain of `then`s to execute all at once, improving cache locality by cannam@135: // clustering operations on the same data. However, this implies that starvation can occur cannam@135: // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for cannam@135: // actual I/O. To solve this, use `kj::evalLater()` to yield control; this way, all other events cannam@135: // in the queue will get a chance to run before your callback is executed. cannam@135: cannam@135: Promise ignoreResult() KJ_WARN_UNUSED_RESULT { return then([](T&&) {}); } cannam@135: // Convenience method to convert the promise to a void promise by ignoring the return value. cannam@135: // cannam@135: // You must still wait on the returned promise if you want the task to execute. cannam@135: cannam@135: template cannam@135: Promise catch_(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT; cannam@135: // Equivalent to `.then(identityFunc, errorHandler)`, where `identifyFunc` is a function that cannam@135: // just returns its input. cannam@135: cannam@135: T wait(WaitScope& waitScope); cannam@135: // Run the event loop until the promise is fulfilled, then return its result. If the promise cannam@135: // is rejected, throw an exception. cannam@135: // cannam@135: // wait() is primarily useful at the top level of a program -- typically, within the function cannam@135: // that allocated the EventLoop. For example, a program that performs one or two RPCs and then cannam@135: // exits would likely use wait() in its main() function to wait on each RPC. On the other hand, cannam@135: // server-side code generally cannot use wait(), because it has to be able to accept multiple cannam@135: // requests at once. cannam@135: // cannam@135: // If the promise is rejected, `wait()` throws an exception. If the program was compiled without cannam@135: // exceptions (-fno-exceptions), this will usually abort. In this case you really should first cannam@135: // use `then()` to set an appropriate handler for the exception case, so that the promise you cannam@135: // actually wait on never throws. cannam@135: // cannam@135: // `waitScope` is an object proving that the caller is in a scope where wait() is allowed. By cannam@135: // convention, any function which might call wait(), or which might call another function which cannam@135: // might call wait(), must take `WaitScope&` as one of its parameters. This is needed for two cannam@135: // reasons: cannam@135: // * `wait()` is not allowed during an event callback, because event callbacks are themselves cannam@135: // called during some other `wait()`, and such recursive `wait()`s would only be able to cannam@135: // complete in LIFO order, which might mean that the outer `wait()` ends up waiting longer cannam@135: // than it is supposed to. To prevent this, a `WaitScope` cannot be constructed or used during cannam@135: // an event callback. cannam@135: // * Since `wait()` runs the event loop, unrelated event callbacks may execute before `wait()` cannam@135: // returns. This means that anyone calling `wait()` must be reentrant -- state may change cannam@135: // around them in arbitrary ways. Therefore, callers really need to know if a function they cannam@135: // are calling might wait(), and the `WaitScope&` parameter makes this clear. cannam@135: // cannam@135: // TODO(someday): Implement fibers, and let them call wait() even when they are handling an cannam@135: // event. cannam@135: cannam@135: ForkedPromise fork() KJ_WARN_UNUSED_RESULT; cannam@135: // Forks the promise, so that multiple different clients can independently wait on the result. cannam@135: // `T` must be copy-constructable for this to work. Or, in the special case where `T` is cannam@135: // `Own`, `U` must have a method `Own addRef()` which returns a new reference to the same cannam@135: // (or an equivalent) object (probably implemented via reference counting). cannam@135: cannam@135: _::SplitTuplePromise split(); cannam@135: // Split a promise for a tuple into a tuple of promises. cannam@135: // cannam@135: // E.g. if you have `Promise>`, `split()` returns cannam@135: // `kj::Tuple, Promise>`. cannam@135: cannam@135: Promise exclusiveJoin(Promise&& other) KJ_WARN_UNUSED_RESULT; cannam@135: // Return a new promise that resolves when either the original promise resolves or `other` cannam@135: // resolves (whichever comes first). The promise that didn't resolve first is canceled. cannam@135: cannam@135: // TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions cannam@135: // and produces a tuple? cannam@135: cannam@135: template cannam@135: Promise attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT; cannam@135: // "Attaches" one or more movable objects (often, Owns) to the promise, such that they will cannam@135: // be destroyed when the promise resolves. This is useful when a promise's callback contains cannam@135: // pointers into some object and you want to make sure the object still exists when the callback cannam@135: // runs -- after calling then(), use attach() to add necessary objects to the result. cannam@135: cannam@135: template cannam@135: Promise eagerlyEvaluate(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT; cannam@135: Promise eagerlyEvaluate(decltype(nullptr)) KJ_WARN_UNUSED_RESULT; cannam@135: // Force eager evaluation of this promise. Use this if you are going to hold on to the promise cannam@135: // for awhile without consuming the result, but you want to make sure that the system actually cannam@135: // processes it. cannam@135: // cannam@135: // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to cannam@135: // `then()`, except that it must return void. We make you specify this because otherwise it's cannam@135: // easy to forget to handle errors in a promise that you never use. You may specify nullptr for cannam@135: // the error handler if you are sure that ignoring errors is fine, or if you know that you'll cannam@135: // eventually wait on the promise somewhere. cannam@135: cannam@135: template cannam@135: void detach(ErrorFunc&& errorHandler); cannam@135: // Allows the promise to continue running in the background until it completes or the cannam@135: // `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this cannam@135: // promise, you need to make sure that the promise owns all the objects it touches or make sure cannam@135: // those objects outlive the EventLoop. cannam@135: // cannam@135: // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to cannam@135: // `then()`, except that it must return void. cannam@135: // cannam@135: // This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be cannam@135: // canceled unless the callee explicitly permits it. cannam@135: cannam@135: kj::String trace(); cannam@135: // Returns a dump of debug info about this promise. Not for production use. Requires RTTI. cannam@135: // This method does NOT consume the promise as other methods do. cannam@135: cannam@135: private: cannam@135: Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {} cannam@135: // Second parameter prevent ambiguity with immediate-value constructor. cannam@135: cannam@135: template cannam@135: friend class Promise; cannam@135: friend class EventLoop; cannam@135: template cannam@135: friend Promise newAdaptedPromise(Params&&... adapterConstructorParams); cannam@135: template cannam@135: friend PromiseFulfillerPair newPromiseAndFulfiller(); cannam@135: template cannam@135: friend class _::ForkHub; cannam@135: friend class _::TaskSetImpl; cannam@135: friend Promise _::yield(); cannam@135: friend class _::NeverDone; cannam@135: template cannam@135: friend Promise> joinPromises(Array>&& promises); cannam@135: friend Promise joinPromises(Array>&& promises); cannam@135: }; cannam@135: cannam@135: template cannam@135: class ForkedPromise { cannam@135: // The result of `Promise::fork()` and `EventLoop::fork()`. Allows branches to be created. cannam@135: // Like `Promise`, this is a pass-by-move type. cannam@135: cannam@135: public: cannam@135: inline ForkedPromise(decltype(nullptr)) {} cannam@135: cannam@135: Promise addBranch(); cannam@135: // Add a new branch to the fork. The branch is equivalent to the original promise. cannam@135: cannam@135: private: cannam@135: Own<_::ForkHub<_::FixVoid>> hub; cannam@135: cannam@135: inline ForkedPromise(bool, Own<_::ForkHub<_::FixVoid>>&& hub): hub(kj::mv(hub)) {} cannam@135: cannam@135: friend class Promise; cannam@135: friend class EventLoop; cannam@135: }; cannam@135: cannam@135: constexpr _::Void READY_NOW = _::Void(); cannam@135: // Use this when you need a Promise that is already fulfilled -- this value can be implicitly cannam@135: // cast to `Promise`. cannam@135: cannam@135: constexpr _::NeverDone NEVER_DONE = _::NeverDone(); cannam@135: // The opposite of `READY_NOW`, return this when the promise should never resolve. This can be cannam@135: // implicitly converted to any promise type. You may also call `NEVER_DONE.wait()` to wait cannam@135: // forever (useful for servers). cannam@135: cannam@135: template cannam@135: PromiseForResult evalLater(Func&& func) KJ_WARN_UNUSED_RESULT; cannam@135: // Schedule for the given zero-parameter function to be executed in the event loop at some cannam@135: // point in the near future. Returns a Promise for its result -- or, if `func()` itself returns cannam@135: // a promise, `evalLater()` returns a Promise for the result of resolving that promise. cannam@135: // cannam@135: // Example usage: cannam@135: // Promise x = evalLater([]() { return 123; }); cannam@135: // cannam@135: // The above is exactly equivalent to: cannam@135: // Promise x = Promise(READY_NOW).then([]() { return 123; }); cannam@135: // cannam@135: // If the returned promise is destroyed before the callback runs, the callback will be canceled cannam@135: // (never called). cannam@135: // cannam@135: // If you schedule several evaluations with `evalLater` during the same callback, they are cannam@135: // guaranteed to be executed in order. cannam@135: cannam@135: template cannam@135: PromiseForResult evalNow(Func&& func) KJ_WARN_UNUSED_RESULT; cannam@135: // Run `func()` and return a promise for its result. `func()` executes before `evalNow()` returns. cannam@135: // If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the cannam@135: // main reason why `evalNow()` is useful. cannam@135: cannam@135: template cannam@135: Promise> joinPromises(Array>&& promises); cannam@135: // Join an array of promises into a promise for an array. cannam@135: cannam@135: // ======================================================================================= cannam@135: // Hack for creating a lambda that holds an owned pointer. cannam@135: cannam@135: template cannam@135: class CaptureByMove { cannam@135: public: cannam@135: inline CaptureByMove(Func&& func, MovedParam&& param) cannam@135: : func(kj::mv(func)), param(kj::mv(param)) {} cannam@135: cannam@135: template cannam@135: inline auto operator()(Params&&... params) cannam@135: -> decltype(kj::instance()(kj::instance(), kj::fwd(params)...)) { cannam@135: return func(kj::mv(param), kj::fwd(params)...); cannam@135: } cannam@135: cannam@135: private: cannam@135: Func func; cannam@135: MovedParam param; cannam@135: }; cannam@135: cannam@135: template cannam@135: inline CaptureByMove> mvCapture(MovedParam&& param, Func&& func) { cannam@135: // Hack to create a "lambda" which captures a variable by moving it rather than copying or cannam@135: // referencing. C++14 generalized captures should make this obsolete, but for now in C++11 this cannam@135: // is commonly needed for Promise continuations that own their state. Example usage: cannam@135: // cannam@135: // Own ptr = makeFoo(); cannam@135: // Promise promise = callRpc(); cannam@135: // promise.then(mvCapture(ptr, [](Own&& ptr, int result) { cannam@135: // return ptr->finish(result); cannam@135: // })); cannam@135: cannam@135: return CaptureByMove>(kj::fwd(func), kj::mv(param)); cannam@135: } cannam@135: cannam@135: // ======================================================================================= cannam@135: // Advanced promise construction cannam@135: cannam@135: template cannam@135: class PromiseFulfiller { cannam@135: // A callback which can be used to fulfill a promise. Only the first call to fulfill() or cannam@135: // reject() matters; subsequent calls are ignored. cannam@135: cannam@135: public: cannam@135: virtual void fulfill(T&& value) = 0; cannam@135: // Fulfill the promise with the given value. cannam@135: cannam@135: virtual void reject(Exception&& exception) = 0; cannam@135: // Reject the promise with an error. cannam@135: cannam@135: virtual bool isWaiting() = 0; cannam@135: // Returns true if the promise is still unfulfilled and someone is potentially waiting for it. cannam@135: // Returns false if fulfill()/reject() has already been called *or* if the promise to be cannam@135: // fulfilled has been discarded and therefore the result will never be used anyway. cannam@135: cannam@135: template cannam@135: bool rejectIfThrows(Func&& func); cannam@135: // Call the function (with no arguments) and return true. If an exception is thrown, call cannam@135: // `fulfiller.reject()` and then return false. When compiled with exceptions disabled, cannam@135: // non-fatal exceptions are still detected and handled correctly. cannam@135: }; cannam@135: cannam@135: template <> cannam@135: class PromiseFulfiller { cannam@135: // Specialization of PromiseFulfiller for void promises. See PromiseFulfiller. cannam@135: cannam@135: public: cannam@135: virtual void fulfill(_::Void&& value = _::Void()) = 0; cannam@135: // Call with zero parameters. The parameter is a dummy that only exists so that subclasses don't cannam@135: // have to specialize for . cannam@135: cannam@135: virtual void reject(Exception&& exception) = 0; cannam@135: virtual bool isWaiting() = 0; cannam@135: cannam@135: template cannam@135: bool rejectIfThrows(Func&& func); cannam@135: }; cannam@135: cannam@135: template cannam@135: Promise newAdaptedPromise(Params&&... adapterConstructorParams); cannam@135: // Creates a new promise which owns an instance of `Adapter` which encapsulates the operation cannam@135: // that will eventually fulfill the promise. This is primarily useful for adapting non-KJ cannam@135: // asynchronous APIs to use promises. cannam@135: // cannam@135: // An instance of `Adapter` will be allocated and owned by the returned `Promise`. A cannam@135: // `PromiseFulfiller&` will be passed as the first parameter to the adapter's constructor, cannam@135: // and `adapterConstructorParams` will be forwarded as the subsequent parameters. The adapter cannam@135: // is expected to perform some asynchronous operation and call the `PromiseFulfiller` once cannam@135: // it is finished. cannam@135: // cannam@135: // The adapter is destroyed when its owning Promise is destroyed. This may occur before the cannam@135: // Promise has been fulfilled. In this case, the adapter's destructor should cancel the cannam@135: // asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be cannam@135: // called. cannam@135: // cannam@135: // An adapter implementation should be carefully written to ensure that it cannot accidentally cannam@135: // be left unfulfilled permanently because of an exception. Consider making liberal use of cannam@135: // `PromiseFulfiller::rejectIfThrows()`. cannam@135: cannam@135: template cannam@135: struct PromiseFulfillerPair { cannam@135: Promise<_::JoinPromises> promise; cannam@135: Own> fulfiller; cannam@135: }; cannam@135: cannam@135: template cannam@135: PromiseFulfillerPair newPromiseAndFulfiller(); cannam@135: // Construct a Promise and a separate PromiseFulfiller which can be used to fulfill the promise. cannam@135: // If the PromiseFulfiller is destroyed before either of its methods are called, the Promise is cannam@135: // implicitly rejected. cannam@135: // cannam@135: // Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback cannam@135: // that there is no way to handle cancellation (i.e. detect when the Promise is discarded). cannam@135: // cannam@135: // You can arrange to fulfill a promise with another promise by using a promise type for T. E.g. cannam@135: // `newPromiseAndFulfiller>()` will produce a promise of type `Promise` but the cannam@135: // fulfiller will be of type `PromiseFulfiller>`. Thus you pass a `Promise` to the cannam@135: // `fulfill()` callback, and the promises are chained. cannam@135: cannam@135: // ======================================================================================= cannam@135: // TaskSet cannam@135: cannam@135: class TaskSet { cannam@135: // Holds a collection of Promises and ensures that each executes to completion. Memory cannam@135: // associated with each promise is automatically freed when the promise completes. Destroying cannam@135: // the TaskSet itself automatically cancels all unfinished promises. cannam@135: // cannam@135: // This is useful for "daemon" objects that perform background tasks which aren't intended to cannam@135: // fulfill any particular external promise, but which may need to be canceled (and thus can't cannam@135: // use `Promise::detach()`). The daemon object holds a TaskSet to collect these tasks it is cannam@135: // working on. This way, if the daemon itself is destroyed, the TaskSet is detroyed as well, cannam@135: // and everything the daemon is doing is canceled. cannam@135: cannam@135: public: cannam@135: class ErrorHandler { cannam@135: public: cannam@135: virtual void taskFailed(kj::Exception&& exception) = 0; cannam@135: }; cannam@135: cannam@135: TaskSet(ErrorHandler& errorHandler); cannam@135: // `loop` will be used to wait on promises. `errorHandler` will be executed any time a task cannam@135: // throws an exception, and will execute within the given EventLoop. cannam@135: cannam@135: ~TaskSet() noexcept(false); cannam@135: cannam@135: void add(Promise&& promise); cannam@135: cannam@135: kj::String trace(); cannam@135: // Return debug info about all promises currently in the TaskSet. cannam@135: cannam@135: private: cannam@135: Own<_::TaskSetImpl> impl; cannam@135: }; cannam@135: cannam@135: // ======================================================================================= cannam@135: // The EventLoop class cannam@135: cannam@135: class EventPort { cannam@135: // Interfaces between an `EventLoop` and events originating from outside of the loop's thread. cannam@135: // All such events come in through the `EventPort` implementation. cannam@135: // cannam@135: // An `EventPort` implementation may interface with low-level operating system APIs and/or other cannam@135: // threads. You can also write an `EventPort` which wraps some other (non-KJ) event loop cannam@135: // framework, allowing the two to coexist in a single thread. cannam@135: cannam@135: public: cannam@135: virtual bool wait() = 0; cannam@135: // Wait for an external event to arrive, sleeping if necessary. Once at least one event has cannam@135: // arrived, queue it to the event loop (e.g. by fulfilling a promise) and return. cannam@135: // cannam@135: // This is called during `Promise::wait()` whenever the event queue becomes empty, in order to cannam@135: // wait for new events to populate the queue. cannam@135: // cannam@135: // It is safe to return even if nothing has actually been queued, so long as calling `wait()` in cannam@135: // a loop will eventually sleep. (That is to say, false positives are fine.) cannam@135: // cannam@135: // Returns true if wake() has been called from another thread. (Precisely, returns true if cannam@135: // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last cannam@135: // called.) cannam@135: cannam@135: virtual bool poll() = 0; cannam@135: // Check if any external events have arrived, but do not sleep. If any events have arrived, cannam@135: // add them to the event queue (e.g. by fulfilling promises) before returning. cannam@135: // cannam@135: // This may be called during `Promise::wait()` when the EventLoop has been executing for a while cannam@135: // without a break but is still non-empty. cannam@135: // cannam@135: // Returns true if wake() has been called from another thread. (Precisely, returns true if cannam@135: // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last cannam@135: // called.) cannam@135: cannam@135: virtual void setRunnable(bool runnable); cannam@135: // Called to notify the `EventPort` when the `EventLoop` has work to do; specifically when it cannam@135: // transitions from empty -> runnable or runnable -> empty. This is typically useful when cannam@135: // integrating with an external event loop; if the loop is currently runnable then you should cannam@135: // arrange to call run() on it soon. The default implementation does nothing. cannam@135: cannam@135: virtual void wake() const; cannam@135: // Wake up the EventPort's thread from another thread. cannam@135: // cannam@135: // Unlike all other methods on this interface, `wake()` may be called from another thread, hence cannam@135: // it is `const`. cannam@135: // cannam@135: // Technically speaking, `wake()` causes the target thread to cease sleeping and not to sleep cannam@135: // again until `wait()` or `poll()` has returned true at least once. cannam@135: // cannam@135: // The default implementation throws an UNIMPLEMENTED exception. cannam@135: }; cannam@135: cannam@135: class EventLoop { cannam@135: // Represents a queue of events being executed in a loop. Most code won't interact with cannam@135: // EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the cannam@135: // documentation for `Promise`. cannam@135: // cannam@135: // Each thread can have at most one current EventLoop. To make an `EventLoop` current for cannam@135: // the thread, create a `WaitScope`. Async APIs require that the thread has a current EventLoop, cannam@135: // or they will throw exceptions. APIs that use `Promise::wait()` additionally must explicitly cannam@135: // be passed a reference to the `WaitScope` to make the caller aware that they might block. cannam@135: // cannam@135: // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g. cannam@135: // in the main() function, or in the start function of a thread. You can then use it to cannam@135: // construct some promises and wait on the result. Example: cannam@135: // cannam@135: // int main() { cannam@135: // // `loop` becomes the official EventLoop for the thread. cannam@135: // MyEventPort eventPort; cannam@135: // EventLoop loop(eventPort); cannam@135: // cannam@135: // // Now we can call an async function. cannam@135: // Promise textPromise = getHttp("http://example.com"); cannam@135: // cannam@135: // // And we can wait for the promise to complete. Note that you can only use `wait()` cannam@135: // // from the top level, not from inside a promise callback. cannam@135: // String text = textPromise.wait(); cannam@135: // print(text); cannam@135: // return 0; cannam@135: // } cannam@135: // cannam@135: // Most applications that do I/O will prefer to use `setupAsyncIo()` from `async-io.h` rather cannam@135: // than allocate an `EventLoop` directly. cannam@135: cannam@135: public: cannam@135: EventLoop(); cannam@135: // Construct an `EventLoop` which does not receive external events at all. cannam@135: cannam@135: explicit EventLoop(EventPort& port); cannam@135: // Construct an `EventLoop` which receives external events through the given `EventPort`. cannam@135: cannam@135: ~EventLoop() noexcept(false); cannam@135: cannam@135: void run(uint maxTurnCount = maxValue); cannam@135: // Run the event loop for `maxTurnCount` turns or until there is nothing left to be done, cannam@135: // whichever comes first. This never calls the `EventPort`'s `sleep()` or `poll()`. It will cannam@135: // call the `EventPort`'s `setRunnable(false)` if the queue becomes empty. cannam@135: cannam@135: bool isRunnable(); cannam@135: // Returns true if run() would currently do anything, or false if the queue is empty. cannam@135: cannam@135: private: cannam@135: EventPort& port; cannam@135: cannam@135: bool running = false; cannam@135: // True while looping -- wait() is then not allowed. cannam@135: cannam@135: bool lastRunnableState = false; cannam@135: // What did we last pass to port.setRunnable()? cannam@135: cannam@135: _::Event* head = nullptr; cannam@135: _::Event** tail = &head; cannam@135: _::Event** depthFirstInsertPoint = &head; cannam@135: cannam@135: Own<_::TaskSetImpl> daemons; cannam@135: cannam@135: bool turn(); cannam@135: void setRunnable(bool runnable); cannam@135: void enterScope(); cannam@135: void leaveScope(); cannam@135: cannam@135: friend void _::detach(kj::Promise&& promise); cannam@135: friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, cannam@135: WaitScope& waitScope); cannam@135: friend class _::Event; cannam@135: friend class WaitScope; cannam@135: }; cannam@135: cannam@135: class WaitScope { cannam@135: // Represents a scope in which asynchronous programming can occur. A `WaitScope` should usually cannam@135: // be allocated on the stack and serves two purposes: cannam@135: // * While the `WaitScope` exists, its `EventLoop` is registered as the current loop for the cannam@135: // thread. Most operations dealing with `Promise` (including all of its methods) do not work cannam@135: // unless the thread has a current `EventLoop`. cannam@135: // * `WaitScope` may be passed to `Promise::wait()` to synchronously wait for a particular cannam@135: // promise to complete. See `Promise::wait()` for an extended discussion. cannam@135: cannam@135: public: cannam@135: inline explicit WaitScope(EventLoop& loop): loop(loop) { loop.enterScope(); } cannam@135: inline ~WaitScope() { loop.leaveScope(); } cannam@135: KJ_DISALLOW_COPY(WaitScope); cannam@135: cannam@135: private: cannam@135: EventLoop& loop; cannam@135: friend class EventLoop; cannam@135: friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, cannam@135: WaitScope& waitScope); cannam@135: }; cannam@135: cannam@135: } // namespace kj cannam@135: cannam@135: #include "async-inl.h" cannam@135: cannam@135: #endif // KJ_ASYNC_H_