1// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2// Licensed under the MIT License:
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20// THE SOFTWARE.
21
22#pragma once
23
24#if defined(__GNUC__) && !KJ_HEADER_WARNINGS
25#pragma GCC system_header
26#endif
27
28#include "async-prelude.h"
29#include "exception.h"
30#include "refcount.h"
31
32namespace kj {
33
34class EventLoop;
35class WaitScope;
36
37template <typename T>
38class Promise;
39template <typename T>
40class ForkedPromise;
41template <typename T>
42class PromiseFulfiller;
43template <typename T>
44struct PromiseFulfillerPair;
45
46template <typename Func, typename T>
47using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>;
48// Evaluates to the type of Promise for the result of calling functor type Func with parameter type
49// T. If T is void, then the promise is for the result of calling Func with no arguments. If
50// Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>.
51
52// =======================================================================================
53// Promises
54
55template <typename T>
56class Promise: protected _::PromiseBase {
57 // The basic primitive of asynchronous computation in KJ. Similar to "futures", but designed
58 // specifically for event loop concurrency. Similar to E promises and JavaScript Promises/A.
59 //
60 // A Promise represents a promise to produce a value of type T some time in the future. Once
61 // that value has been produced, the promise is "fulfilled". Alternatively, a promise can be
62 // "broken", with an Exception describing what went wrong. You may implicitly convert a value of
63 // type T to an already-fulfilled Promise<T>. You may implicitly convert the constant
64 // `kj::READY_NOW` to an already-fulfilled Promise<void>. You may also implicitly convert a
65 // `kj::Exception` to an already-broken promise of any type.
66 //
67 // Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed
68 // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations
69 // meant to fulfill the promise will be canceled if possible. All methods of `Promise` (unless
70 // otherwise noted) actually consume the promise in the sense of move semantics. (Arguably they
71 // should be rvalue-qualified, but at the time this interface was created compilers didn't widely
72 // support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().) If
73 // you want to use one Promise in two different places, you must fork it with `fork()`.
74 //
75 // To use the result of a Promise, you must call `then()` and supply a callback function to
76 // call with the result. `then()` returns another promise, for the result of the callback.
77 // Any time that this would result in Promise<Promise<T>>, the promises are collapsed into a
78 // simple Promise<T> that first waits for the outer promise, then the inner. Example:
79 //
80 // // Open a remote file, read the content, and then count the
81 // // number of lines of text.
82 // // Note that none of the calls here block. `file`, `content`
83 // // and `lineCount` are all initialized immediately before any
84 // // asynchronous operations occur. The lambda callbacks are
85 // // called later.
86 // Promise<Own<File>> file = openFtp("ftp://host/foo/bar");
87 // Promise<String> content = file.then(
88 // [](Own<File> file) -> Promise<String> {
89 // return file.readAll();
90 // });
91 // Promise<int> lineCount = content.then(
92 // [](String text) -> int {
93 // uint count = 0;
94 // for (char c: text) count += (c == '\n');
95 // return count;
96 // });
97 //
98 // For `then()` to work, the current thread must have an active `EventLoop`. Each callback
99 // is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current
100 // thread's event loop, you do not need to worry about two callbacks running at the same time.
101 // You will need to set up at least one `EventLoop` at the top level of your program before you
102 // can use promises.
103 //
104 // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`.
105 //
106 // Systems using promises should consider supporting the concept of "pipelining". Pipelining
107 // means allowing a caller to start issuing method calls against a promised object before the
108 // promise has actually been fulfilled. This is particularly useful if the promise is for a
109 // remote object living across a network, as this can avoid round trips when chaining a series
110 // of calls. It is suggested that any class T which supports pipelining implement a subclass of
111 // Promise<T> which adds "eventual send" methods -- methods which, when called, say "please
112 // invoke the corresponding method on the promised value once it is available". These methods
113 // should in turn return promises for the eventual results of said invocations. Cap'n Proto,
114 // for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see
115 // `capnp/capability.h`.
116 //
117 // KJ Promises are based on E promises:
118 // http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises
119 //
120 // KJ Promises are also inspired in part by the evolving standards for JavaScript/ECMAScript
121 // promises, which are themselves influenced by E promises:
122 // http://promisesaplus.com/
123 // https://github.com/domenic/promises-unwrapping
124
125public:
126 Promise(_::FixVoid<T> value);
127 // Construct an already-fulfilled Promise from a value of type T. For non-void promises, the
128 // parameter type is simply T. So, e.g., in a function that returns `Promise<int>`, you can
129 // say `return 123;` to return a promise that is already fulfilled to 123.
130 //
131 // For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`.
132
133 Promise(kj::Exception&& e);
134 // Construct an already-broken Promise.
135
136 inline Promise(decltype(nullptr)) {}
137
138 template <typename Func, typename ErrorFunc = _::PropagateException>
139 PromiseForResult<Func, T> then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException())
140 KJ_WARN_UNUSED_RESULT;
141 // Register a continuation function to be executed when the promise completes. The continuation
142 // (`func`) takes the promised value (an rvalue of type `T`) as its parameter. The continuation
143 // may return a new value; `then()` itself returns a promise for the continuation's eventual
144 // result. If the continuation itself returns a `Promise<U>`, then `then()` shall also return
145 // a `Promise<U>` which first waits for the original promise, then executes the continuation,
146 // then waits for the inner promise (i.e. it automatically "unwraps" the promise).
147 //
148 // In all cases, `then()` returns immediately. The continuation is executed later. The
149 // continuation is always executed on the same EventLoop (and, therefore, the same thread) which
150 // called `then()`, therefore no synchronization is necessary on state shared by the continuation
151 // and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws
152 // an exception.
153 //
154 // You may also specify an error handler continuation as the second parameter. `errorHandler`
155 // must be a functor taking a parameter of type `kj::Exception&&`. It must return the same
156 // type as `func` returns (except when `func` returns `Promise<U>`, in which case `errorHandler`
157 // may return either `Promise<U>` or just `U`). The default error handler simply propagates the
158 // exception to the returned promise.
159 //
160 // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise
161 // is broken. When compiled with -fno-exceptions, the framework will still detect when a
162 // recoverable exception was thrown inside of a continuation and will consider the promise
163 // broken even though a (presumably garbage) result was returned.
164 //
165 // If the returned promise is destroyed before the callback runs, the callback will be canceled
166 // (it will never run).
167 //
168 // Note that `then()` -- like all other Promise methods -- consumes the promise on which it is
169 // called, in the sense of move semantics. After returning, the original promise is no longer
170 // valid, but `then()` returns a new promise.
171 //
172 // *Advanced implementation tips:* Most users will never need to worry about the below, but
173 // it is good to be aware of.
174 //
175 // As an optimization, if the callback function `func` does _not_ return another promise, then
176 // execution of `func` itself may be delayed until its result is known to be needed. The
177 // expectation here is that `func` is just doing some transformation on the results, not
178 // scheduling any other actions, therefore the system doesn't need to be proactive about
179 // evaluating it. This way, a chain of trivial then() transformations can be executed all at
180 // once without repeatedly re-scheduling through the event loop. Use the `eagerlyEvaluate()`
181 // method to suppress this behavior.
182 //
183 // On the other hand, if `func` _does_ return another promise, then the system evaluates `func`
184 // as soon as possible, because the promise it returns might be for a newly-scheduled
185 // long-running asynchronous task.
186 //
187 // As another optimization, when a callback function registered with `then()` is actually
188 // scheduled, it is scheduled to occur immediately, preempting other work in the event queue.
189 // This allows a long chain of `then`s to execute all at once, improving cache locality by
190 // clustering operations on the same data. However, this implies that starvation can occur
191 // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for
192 // actual I/O. To solve this, use `kj::evalLater()` to yield control; this way, all other events
193 // in the queue will get a chance to run before your callback is executed.
194
195 Promise<void> ignoreResult() KJ_WARN_UNUSED_RESULT { return then([](T&&) {}); }
196 // Convenience method to convert the promise to a void promise by ignoring the return value.
197 //
198 // You must still wait on the returned promise if you want the task to execute.
199
200 template <typename ErrorFunc>
201 Promise<T> catch_(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT;
202 // Equivalent to `.then(identityFunc, errorHandler)`, where `identifyFunc` is a function that
203 // just returns its input.
204
205 T wait(WaitScope& waitScope);
206 // Run the event loop until the promise is fulfilled, then return its result. If the promise
207 // is rejected, throw an exception.
208 //
209 // wait() is primarily useful at the top level of a program -- typically, within the function
210 // that allocated the EventLoop. For example, a program that performs one or two RPCs and then
211 // exits would likely use wait() in its main() function to wait on each RPC. On the other hand,
212 // server-side code generally cannot use wait(), because it has to be able to accept multiple
213 // requests at once.
214 //
215 // If the promise is rejected, `wait()` throws an exception. If the program was compiled without
216 // exceptions (-fno-exceptions), this will usually abort. In this case you really should first
217 // use `then()` to set an appropriate handler for the exception case, so that the promise you
218 // actually wait on never throws.
219 //
220 // `waitScope` is an object proving that the caller is in a scope where wait() is allowed. By
221 // convention, any function which might call wait(), or which might call another function which
222 // might call wait(), must take `WaitScope&` as one of its parameters. This is needed for two
223 // reasons:
224 // * `wait()` is not allowed during an event callback, because event callbacks are themselves
225 // called during some other `wait()`, and such recursive `wait()`s would only be able to
226 // complete in LIFO order, which might mean that the outer `wait()` ends up waiting longer
227 // than it is supposed to. To prevent this, a `WaitScope` cannot be constructed or used during
228 // an event callback.
229 // * Since `wait()` runs the event loop, unrelated event callbacks may execute before `wait()`
230 // returns. This means that anyone calling `wait()` must be reentrant -- state may change
231 // around them in arbitrary ways. Therefore, callers really need to know if a function they
232 // are calling might wait(), and the `WaitScope&` parameter makes this clear.
233 //
234 // TODO(someday): Implement fibers, and let them call wait() even when they are handling an
235 // event.
236
237 bool poll(WaitScope& waitScope);
238 // Returns true if a call to wait() would complete without blocking, false if it would block.
239 //
240 // If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an
241 // attempt to resolve it. Only when there is nothing left to do will it return false.
242 //
243 // Generally, poll() is most useful in tests. Often, you may want to verify that a promise does
244 // not resolve until some specific event occurs. To do so, poll() the promise before the event to
245 // verify it isn't resolved, then trigger the event, then poll() again to verify that it resolves.
246 // The first poll() verifies that the promise doesn't resolve early, which would otherwise be
247 // hard to do deterministically. The second poll() allows you to check that the promise has
248 // resolved and avoid a wait() that might deadlock in the case that it hasn't.
249
250 ForkedPromise<T> fork() KJ_WARN_UNUSED_RESULT;
251 // Forks the promise, so that multiple different clients can independently wait on the result.
252 // `T` must be copy-constructable for this to work. Or, in the special case where `T` is
253 // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same
254 // (or an equivalent) object (probably implemented via reference counting).
255
256 _::SplitTuplePromise<T> split();
257 // Split a promise for a tuple into a tuple of promises.
258 //
259 // E.g. if you have `Promise<kj::Tuple<T, U>>`, `split()` returns
260 // `kj::Tuple<Promise<T>, Promise<U>>`.
261
262 Promise<T> exclusiveJoin(Promise<T>&& other) KJ_WARN_UNUSED_RESULT;
263 // Return a new promise that resolves when either the original promise resolves or `other`
264 // resolves (whichever comes first). The promise that didn't resolve first is canceled.
265
266 // TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions
267 // and produces a tuple?
268
269 template <typename... Attachments>
270 Promise<T> attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT;
271 // "Attaches" one or more movable objects (often, Own<T>s) to the promise, such that they will
272 // be destroyed when the promise resolves. This is useful when a promise's callback contains
273 // pointers into some object and you want to make sure the object still exists when the callback
274 // runs -- after calling then(), use attach() to add necessary objects to the result.
275
276 template <typename ErrorFunc>
277 Promise<T> eagerlyEvaluate(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT;
278 Promise<T> eagerlyEvaluate(decltype(nullptr)) KJ_WARN_UNUSED_RESULT;
279 // Force eager evaluation of this promise. Use this if you are going to hold on to the promise
280 // for awhile without consuming the result, but you want to make sure that the system actually
281 // processes it.
282 //
283 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
284 // `then()`, or the parameter to `catch_()`. We make you specify this because otherwise it's
285 // easy to forget to handle errors in a promise that you never use. You may specify nullptr for
286 // the error handler if you are sure that ignoring errors is fine, or if you know that you'll
287 // eventually wait on the promise somewhere.
288
289 template <typename ErrorFunc>
290 void detach(ErrorFunc&& errorHandler);
291 // Allows the promise to continue running in the background until it completes or the
292 // `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this
293 // promise, you need to make sure that the promise owns all the objects it touches or make sure
294 // those objects outlive the EventLoop.
295 //
296 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
297 // `then()`, except that it must return void.
298 //
299 // This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
300 // canceled unless the callee explicitly permits it.
301
302 kj::String trace();
303 // Returns a dump of debug info about this promise. Not for production use. Requires RTTI.
304 // This method does NOT consume the promise as other methods do.
305
306private:
307 Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {}
308 // Second parameter prevent ambiguity with immediate-value constructor.
309
310 template <typename>
311 friend class Promise;
312 friend class EventLoop;
313 template <typename U, typename Adapter, typename... Params>
314 friend Promise<U> newAdaptedPromise(Params&&... adapterConstructorParams);
315 template <typename U>
316 friend PromiseFulfillerPair<U> newPromiseAndFulfiller();
317 template <typename>
318 friend class _::ForkHub;
319 friend class TaskSet;
320 friend Promise<void> _::yield();
321 friend class _::NeverDone;
322 template <typename U>
323 friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises);
324 friend Promise<void> joinPromises(Array<Promise<void>>&& promises);
325};
326
327template <typename T>
328class ForkedPromise {
329 // The result of `Promise::fork()` and `EventLoop::fork()`. Allows branches to be created.
330 // Like `Promise<T>`, this is a pass-by-move type.
331
332public:
333 inline ForkedPromise(decltype(nullptr)) {}
334
335 Promise<T> addBranch();
336 // Add a new branch to the fork. The branch is equivalent to the original promise.
337
338private:
339 Own<_::ForkHub<_::FixVoid<T>>> hub;
340
341 inline ForkedPromise(bool, Own<_::ForkHub<_::FixVoid<T>>>&& hub): hub(kj::mv(hub)) {}
342
343 friend class Promise<T>;
344 friend class EventLoop;
345};
346
347constexpr _::Void READY_NOW = _::Void();
348// Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly
349// cast to `Promise<void>`.
350
351constexpr _::NeverDone NEVER_DONE = _::NeverDone();
352// The opposite of `READY_NOW`, return this when the promise should never resolve. This can be
353// implicitly converted to any promise type. You may also call `NEVER_DONE.wait()` to wait
354// forever (useful for servers).
355
356template <typename Func>
357PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT;
358// Schedule for the given zero-parameter function to be executed in the event loop at some
359// point in the near future. Returns a Promise for its result -- or, if `func()` itself returns
360// a promise, `evalLater()` returns a Promise for the result of resolving that promise.
361//
362// Example usage:
363// Promise<int> x = evalLater([]() { return 123; });
364//
365// The above is exactly equivalent to:
366// Promise<int> x = Promise<void>(READY_NOW).then([]() { return 123; });
367//
368// If the returned promise is destroyed before the callback runs, the callback will be canceled
369// (never called).
370//
371// If you schedule several evaluations with `evalLater` during the same callback, they are
372// guaranteed to be executed in order.
373
374template <typename Func>
375PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT;
376// Run `func()` and return a promise for its result. `func()` executes before `evalNow()` returns.
377// If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the
378// main reason why `evalNow()` is useful.
379
380template <typename T>
381Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
382// Join an array of promises into a promise for an array.
383
384// =======================================================================================
385// Hack for creating a lambda that holds an owned pointer.
386
387template <typename Func, typename MovedParam>
388class CaptureByMove {
389public:
390 inline CaptureByMove(Func&& func, MovedParam&& param)
391 : func(kj::mv(func)), param(kj::mv(param)) {}
392
393 template <typename... Params>
394 inline auto operator()(Params&&... params)
395 -> decltype(kj::instance<Func>()(kj::instance<MovedParam&&>(), kj::fwd<Params>(params)...)) {
396 return func(kj::mv(param), kj::fwd<Params>(params)...);
397 }
398
399private:
400 Func func;
401 MovedParam param;
402};
403
404template <typename Func, typename MovedParam>
405inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func) {
406 // Hack to create a "lambda" which captures a variable by moving it rather than copying or
407 // referencing. C++14 generalized captures should make this obsolete, but for now in C++11 this
408 // is commonly needed for Promise continuations that own their state. Example usage:
409 //
410 // Own<Foo> ptr = makeFoo();
411 // Promise<int> promise = callRpc();
412 // promise.then(mvCapture(ptr, [](Own<Foo>&& ptr, int result) {
413 // return ptr->finish(result);
414 // }));
415
416 return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param));
417}
418
419// =======================================================================================
420// Advanced promise construction
421
422template <typename T>
423class PromiseFulfiller {
424 // A callback which can be used to fulfill a promise. Only the first call to fulfill() or
425 // reject() matters; subsequent calls are ignored.
426
427public:
428 virtual void fulfill(T&& value) = 0;
429 // Fulfill the promise with the given value.
430
431 virtual void reject(Exception&& exception) = 0;
432 // Reject the promise with an error.
433
434 virtual bool isWaiting() = 0;
435 // Returns true if the promise is still unfulfilled and someone is potentially waiting for it.
436 // Returns false if fulfill()/reject() has already been called *or* if the promise to be
437 // fulfilled has been discarded and therefore the result will never be used anyway.
438
439 template <typename Func>
440 bool rejectIfThrows(Func&& func);
441 // Call the function (with no arguments) and return true. If an exception is thrown, call
442 // `fulfiller.reject()` and then return false. When compiled with exceptions disabled,
443 // non-fatal exceptions are still detected and handled correctly.
444};
445
446template <>
447class PromiseFulfiller<void> {
448 // Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>.
449
450public:
451 virtual void fulfill(_::Void&& value = _::Void()) = 0;
452 // Call with zero parameters. The parameter is a dummy that only exists so that subclasses don't
453 // have to specialize for <void>.
454
455 virtual void reject(Exception&& exception) = 0;
456 virtual bool isWaiting() = 0;
457
458 template <typename Func>
459 bool rejectIfThrows(Func&& func);
460};
461
462template <typename T, typename Adapter, typename... Params>
463Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams);
464// Creates a new promise which owns an instance of `Adapter` which encapsulates the operation
465// that will eventually fulfill the promise. This is primarily useful for adapting non-KJ
466// asynchronous APIs to use promises.
467//
468// An instance of `Adapter` will be allocated and owned by the returned `Promise`. A
469// `PromiseFulfiller<T>&` will be passed as the first parameter to the adapter's constructor,
470// and `adapterConstructorParams` will be forwarded as the subsequent parameters. The adapter
471// is expected to perform some asynchronous operation and call the `PromiseFulfiller<T>` once
472// it is finished.
473//
474// The adapter is destroyed when its owning Promise is destroyed. This may occur before the
475// Promise has been fulfilled. In this case, the adapter's destructor should cancel the
476// asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be
477// called.
478//
479// An adapter implementation should be carefully written to ensure that it cannot accidentally
480// be left unfulfilled permanently because of an exception. Consider making liberal use of
481// `PromiseFulfiller<T>::rejectIfThrows()`.
482
483template <typename T>
484struct PromiseFulfillerPair {
485 _::ReducePromises<T> promise;
486 Own<PromiseFulfiller<T>> fulfiller;
487};
488
489template <typename T>
490PromiseFulfillerPair<T> newPromiseAndFulfiller();
491// Construct a Promise and a separate PromiseFulfiller which can be used to fulfill the promise.
492// If the PromiseFulfiller is destroyed before either of its methods are called, the Promise is
493// implicitly rejected.
494//
495// Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback
496// that there is no way to handle cancellation (i.e. detect when the Promise is discarded).
497//
498// You can arrange to fulfill a promise with another promise by using a promise type for T. E.g.
499// `newPromiseAndFulfiller<Promise<U>>()` will produce a promise of type `Promise<U>` but the
500// fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the
501// `fulfill()` callback, and the promises are chained.
502
503// =======================================================================================
504// Canceler
505
506class Canceler {
507 // A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or
508 // implicitly when the Canceler is destroyed.
509 //
510 // The cancellation is done in such a way that once cancel() (or the Canceler's destructor)
511 // returns, it's guaranteed that the promise has already been canceled and destroyed. This
512 // guarantee is important for enforcing ownership constraints. For example, imagine that Alice
513 // calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's
514 // internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed
515 // at random without Alice having canceled the promise. In this case, it is necessary for Bob to
516 // ensure that the promise will be forcefully canceled. Bob can do this by constructing a
517 // Canceler and using it to wrap promises before returning them to callers. When Bob is
518 // destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors.
519 //
520 // Note that another common strategy for cancelation is to use exclusiveJoin() to join a promise
521 // with some "cancellation promise" which only resolves if the operation should be canceled. The
522 // cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus
523 // calling the PromiseFulfiller cancels the operation. There is a major problem with this
524 // approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the
525 // exclusive-joined promise actually resolves and cancels its other fork. During that time, the
526 // task might continue to execute. If it holds pointers to objects that have been destroyed, this
527 // might cause segfaults. Thus, it is safer to use a Canceler.
528
529public:
530 inline Canceler() {}
531 ~Canceler() noexcept(false);
532 KJ_DISALLOW_COPY(Canceler);
533
534 template <typename T>
535 Promise<T> wrap(Promise<T> promise) {
536 return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise));
537 }
538
539 void cancel(StringPtr cancelReason);
540 void cancel(const Exception& exception);
541 // Cancel all previously-wrapped promises that have not already completed, causing them to throw
542 // the given exception. If you provide just a description message instead of an exception, then
543 // an exception object will be constructed from it -- but only if there are requests to cancel.
544
545 void release();
546 // Releases previously-wrapped promises, so that they will not be canceled regardless of what
547 // happens to this Canceler.
548
549 bool isEmpty() const { return list == nullptr; }
550 // Indicates if any previously-wrapped promises are still executing. (If this returns false, then
551 // cancel() would be a no-op.)
552
553private:
554 class AdapterBase {
555 public:
556 AdapterBase(Canceler& canceler);
557 ~AdapterBase() noexcept(false);
558
559 virtual void cancel(Exception&& e) = 0;
560
561 private:
562 Maybe<Maybe<AdapterBase&>&> prev;
563 Maybe<AdapterBase&> next;
564 friend class Canceler;
565 };
566
567 template <typename T>
568 class AdapterImpl: public AdapterBase {
569 public:
570 AdapterImpl(PromiseFulfiller<T>& fulfiller,
571 Canceler& canceler, Promise<T> inner)
572 : AdapterBase(canceler),
573 fulfiller(fulfiller),
574 inner(inner.then(
575 [&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); },
576 [&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); })
577 .eagerlyEvaluate(nullptr)) {}
578
579 void cancel(Exception&& e) override {
580 fulfiller.reject(kj::mv(e));
581 inner = nullptr;
582 }
583
584 private:
585 PromiseFulfiller<T>& fulfiller;
586 Promise<void> inner;
587 };
588
589 Maybe<AdapterBase&> list;
590};
591
592template <>
593class Canceler::AdapterImpl<void>: public AdapterBase {
594public:
595 AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
596 Canceler& canceler, kj::Promise<void> inner);
597 void cancel(kj::Exception&& e) override;
598 // These must be defined in async.c++ to prevent translation units compiled by MSVC from trying to
599 // link with symbols defined in async.c++ merely because they included async.h.
600
601private:
602 kj::PromiseFulfiller<void>& fulfiller;
603 kj::Promise<void> inner;
604};
605
606// =======================================================================================
607// TaskSet
608
609class TaskSet {
610 // Holds a collection of Promise<void>s and ensures that each executes to completion. Memory
611 // associated with each promise is automatically freed when the promise completes. Destroying
612 // the TaskSet itself automatically cancels all unfinished promises.
613 //
614 // This is useful for "daemon" objects that perform background tasks which aren't intended to
615 // fulfill any particular external promise, but which may need to be canceled (and thus can't
616 // use `Promise::detach()`). The daemon object holds a TaskSet to collect these tasks it is
617 // working on. This way, if the daemon itself is destroyed, the TaskSet is detroyed as well,
618 // and everything the daemon is doing is canceled.
619
620public:
621 class ErrorHandler {
622 public:
623 virtual void taskFailed(kj::Exception&& exception) = 0;
624 };
625
626 TaskSet(ErrorHandler& errorHandler);
627 // `errorHandler` will be executed any time a task throws an exception, and will execute within
628 // the given EventLoop.
629
630 ~TaskSet() noexcept(false);
631
632 void add(Promise<void>&& promise);
633
634 kj::String trace();
635 // Return debug info about all promises currently in the TaskSet.
636
637 bool isEmpty() { return tasks == nullptr; }
638 // Check if any tasks are running.
639
640 Promise<void> onEmpty();
641 // Returns a promise that fulfills the next time the TaskSet is empty. Only one such promise can
642 // exist at a time.
643
644private:
645 class Task;
646
647 TaskSet::ErrorHandler& errorHandler;
648 Maybe<Own<Task>> tasks;
649 Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller;
650};
651
652// =======================================================================================
653// The EventLoop class
654
655class EventPort {
656 // Interfaces between an `EventLoop` and events originating from outside of the loop's thread.
657 // All such events come in through the `EventPort` implementation.
658 //
659 // An `EventPort` implementation may interface with low-level operating system APIs and/or other
660 // threads. You can also write an `EventPort` which wraps some other (non-KJ) event loop
661 // framework, allowing the two to coexist in a single thread.
662
663public:
664 virtual bool wait() = 0;
665 // Wait for an external event to arrive, sleeping if necessary. Once at least one event has
666 // arrived, queue it to the event loop (e.g. by fulfilling a promise) and return.
667 //
668 // This is called during `Promise::wait()` whenever the event queue becomes empty, in order to
669 // wait for new events to populate the queue.
670 //
671 // It is safe to return even if nothing has actually been queued, so long as calling `wait()` in
672 // a loop will eventually sleep. (That is to say, false positives are fine.)
673 //
674 // Returns true if wake() has been called from another thread. (Precisely, returns true if
675 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last
676 // called.)
677
678 virtual bool poll() = 0;
679 // Check if any external events have arrived, but do not sleep. If any events have arrived,
680 // add them to the event queue (e.g. by fulfilling promises) before returning.
681 //
682 // This may be called during `Promise::wait()` when the EventLoop has been executing for a while
683 // without a break but is still non-empty.
684 //
685 // Returns true if wake() has been called from another thread. (Precisely, returns true if
686 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last
687 // called.)
688
689 virtual void setRunnable(bool runnable);
690 // Called to notify the `EventPort` when the `EventLoop` has work to do; specifically when it
691 // transitions from empty -> runnable or runnable -> empty. This is typically useful when
692 // integrating with an external event loop; if the loop is currently runnable then you should
693 // arrange to call run() on it soon. The default implementation does nothing.
694
695 virtual void wake() const;
696 // Wake up the EventPort's thread from another thread.
697 //
698 // Unlike all other methods on this interface, `wake()` may be called from another thread, hence
699 // it is `const`.
700 //
701 // Technically speaking, `wake()` causes the target thread to cease sleeping and not to sleep
702 // again until `wait()` or `poll()` has returned true at least once.
703 //
704 // The default implementation throws an UNIMPLEMENTED exception.
705};
706
707class EventLoop {
708 // Represents a queue of events being executed in a loop. Most code won't interact with
709 // EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the
710 // documentation for `Promise`.
711 //
712 // Each thread can have at most one current EventLoop. To make an `EventLoop` current for
713 // the thread, create a `WaitScope`. Async APIs require that the thread has a current EventLoop,
714 // or they will throw exceptions. APIs that use `Promise::wait()` additionally must explicitly
715 // be passed a reference to the `WaitScope` to make the caller aware that they might block.
716 //
717 // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g.
718 // in the main() function, or in the start function of a thread. You can then use it to
719 // construct some promises and wait on the result. Example:
720 //
721 // int main() {
722 // // `loop` becomes the official EventLoop for the thread.
723 // MyEventPort eventPort;
724 // EventLoop loop(eventPort);
725 //
726 // // Now we can call an async function.
727 // Promise<String> textPromise = getHttp("http://example.com");
728 //
729 // // And we can wait for the promise to complete. Note that you can only use `wait()`
730 // // from the top level, not from inside a promise callback.
731 // String text = textPromise.wait();
732 // print(text);
733 // return 0;
734 // }
735 //
736 // Most applications that do I/O will prefer to use `setupAsyncIo()` from `async-io.h` rather
737 // than allocate an `EventLoop` directly.
738
739public:
740 EventLoop();
741 // Construct an `EventLoop` which does not receive external events at all.
742
743 explicit EventLoop(EventPort& port);
744 // Construct an `EventLoop` which receives external events through the given `EventPort`.
745
746 ~EventLoop() noexcept(false);
747
748 void run(uint maxTurnCount = maxValue);
749 // Run the event loop for `maxTurnCount` turns or until there is nothing left to be done,
750 // whichever comes first. This never calls the `EventPort`'s `sleep()` or `poll()`. It will
751 // call the `EventPort`'s `setRunnable(false)` if the queue becomes empty.
752
753 bool isRunnable();
754 // Returns true if run() would currently do anything, or false if the queue is empty.
755
756private:
757 EventPort& port;
758
759 bool running = false;
760 // True while looping -- wait() is then not allowed.
761
762 bool lastRunnableState = false;
763 // What did we last pass to port.setRunnable()?
764
765 _::Event* head = nullptr;
766 _::Event** tail = &head;
767 _::Event** depthFirstInsertPoint = &head;
768
769 Own<TaskSet> daemons;
770
771 bool turn();
772 void setRunnable(bool runnable);
773 void enterScope();
774 void leaveScope();
775
776 friend void _::detach(kj::Promise<void>&& promise);
777 friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
778 WaitScope& waitScope);
779 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
780 friend class _::Event;
781 friend class WaitScope;
782};
783
784class WaitScope {
785 // Represents a scope in which asynchronous programming can occur. A `WaitScope` should usually
786 // be allocated on the stack and serves two purposes:
787 // * While the `WaitScope` exists, its `EventLoop` is registered as the current loop for the
788 // thread. Most operations dealing with `Promise` (including all of its methods) do not work
789 // unless the thread has a current `EventLoop`.
790 // * `WaitScope` may be passed to `Promise::wait()` to synchronously wait for a particular
791 // promise to complete. See `Promise::wait()` for an extended discussion.
792
793public:
794 inline explicit WaitScope(EventLoop& loop): loop(loop) { loop.enterScope(); }
795 inline ~WaitScope() { loop.leaveScope(); }
796 KJ_DISALLOW_COPY(WaitScope);
797
798 void poll();
799 // Pumps the event queue and polls for I/O until there's nothing left to do (without blocking).
800
801 void setBusyPollInterval(uint count) { busyPollInterval = count; }
802 // Set the maximum number of events to run in a row before calling poll() on the EventPort to
803 // check for new I/O.
804
805private:
806 EventLoop& loop;
807 uint busyPollInterval = kj::maxValue;
808 friend class EventLoop;
809 friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
810 WaitScope& waitScope);
811 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
812};
813
814} // namespace kj
815
816#define KJ_ASYNC_H_INCLUDED
817#include "async-inl.h"
818