1 | // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors |
2 | // Licensed under the MIT License: |
3 | // |
4 | // Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | // of this software and associated documentation files (the "Software"), to deal |
6 | // in the Software without restriction, including without limitation the rights |
7 | // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
8 | // copies of the Software, and to permit persons to whom the Software is |
9 | // furnished to do so, subject to the following conditions: |
10 | // |
11 | // The above copyright notice and this permission notice shall be included in |
12 | // all copies or substantial portions of the Software. |
13 | // |
14 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
19 | // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
20 | // THE SOFTWARE. |
21 | |
22 | // This file contains extended inline implementation details that are required along with async.h. |
23 | // We move this all into a separate file to make async.h more readable. |
24 | // |
25 | // Non-inline declarations here are defined in async.c++. |
26 | |
27 | #pragma once |
28 | |
29 | #if defined(__GNUC__) && !KJ_HEADER_WARNINGS |
30 | #pragma GCC system_header |
31 | #endif |
32 | |
33 | #ifndef KJ_ASYNC_H_INCLUDED |
34 | #error "Do not include this directly; include kj/async.h." |
35 | #include "async.h" // help IDE parse this file |
36 | #endif |
37 | |
38 | namespace kj { |
39 | namespace _ { // private |
40 | |
41 | template <typename T> |
42 | class ExceptionOr; |
43 | |
44 | class ExceptionOrValue { |
45 | public: |
46 | ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {} |
47 | KJ_DISALLOW_COPY(ExceptionOrValue); |
48 | |
49 | void addException(Exception&& exception) { |
50 | if (this->exception == nullptr) { |
51 | this->exception = kj::mv(exception); |
52 | } |
53 | } |
54 | |
55 | template <typename T> |
56 | ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); } |
57 | template <typename T> |
58 | const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); } |
59 | |
60 | Maybe<Exception> exception; |
61 | |
62 | protected: |
63 | // Allow subclasses to have move constructor / assignment. |
64 | ExceptionOrValue() = default; |
65 | ExceptionOrValue(ExceptionOrValue&& other) = default; |
66 | ExceptionOrValue& operator=(ExceptionOrValue&& other) = default; |
67 | }; |
68 | |
69 | template <typename T> |
70 | class ExceptionOr: public ExceptionOrValue { |
71 | public: |
72 | ExceptionOr() = default; |
73 | ExceptionOr(T&& value): value(kj::mv(value)) {} |
74 | ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {} |
75 | ExceptionOr(ExceptionOr&&) = default; |
76 | ExceptionOr& operator=(ExceptionOr&&) = default; |
77 | |
78 | Maybe<T> value; |
79 | }; |
80 | |
81 | class Event { |
82 | // An event waiting to be executed. Not for direct use by applications -- promises use this |
83 | // internally. |
84 | |
85 | public: |
86 | Event(); |
87 | ~Event() noexcept(false); |
88 | KJ_DISALLOW_COPY(Event); |
89 | |
90 | void armDepthFirst(); |
91 | // Enqueue this event so that `fire()` will be called from the event loop soon. |
92 | // |
93 | // Events scheduled in this way are executed in depth-first order: if an event callback arms |
94 | // more events, those events are placed at the front of the queue (in the order in which they |
95 | // were armed), so that they run immediately after the first event's callback returns. |
96 | // |
97 | // Depth-first event scheduling is appropriate for events that represent simple continuations |
98 | // of a previous event that should be globbed together for performance. Depth-first scheduling |
99 | // can lead to starvation, so any long-running task must occasionally yield with |
100 | // `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses |
101 | // breadth-first.) |
102 | // |
103 | // To use breadth-first scheduling instead, use `armBreadthFirst()`. |
104 | |
105 | void armBreadthFirst(); |
106 | // Like `armDepthFirst()` except that the event is placed at the end of the queue. |
107 | |
108 | kj::String trace(); |
109 | // Dump debug info about this event. |
110 | |
111 | virtual _::PromiseNode* getInnerForTrace(); |
112 | // If this event wraps a PromiseNode, get that node. Used for debug tracing. |
113 | // Default implementation returns nullptr. |
114 | |
115 | protected: |
116 | virtual Maybe<Own<Event>> fire() = 0; |
117 | // Fire the event. Possibly returns a pointer to itself, which will be discarded by the |
118 | // caller. This is the only way that an event can delete itself as a result of firing, as |
119 | // doing so from within fire() will throw an exception. |
120 | |
121 | private: |
122 | friend class kj::EventLoop; |
123 | EventLoop& loop; |
124 | Event* next; |
125 | Event** prev; |
126 | bool firing = false; |
127 | }; |
128 | |
129 | class PromiseNode { |
130 | // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations. |
131 | // |
132 | // To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky |
133 | // use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only |
134 | // so down-cast in the few places that really need to be templated. Luckily this is all |
135 | // internal implementation details. |
136 | |
137 | public: |
138 | virtual void onReady(Event* event) noexcept = 0; |
139 | // Arms the given event when ready. |
140 | // |
141 | // May be called multiple times. If called again before the event was armed, the old event will |
142 | // never be armed, only the new one. If called again after the event was armed, the new event |
143 | // will be armed immediately. Can be called with nullptr to un-register the existing event. |
144 | |
145 | virtual void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept; |
146 | // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own |
147 | // this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses |
148 | // this to shorten redundant chains. The default implementation does nothing; only |
149 | // ChainPromiseNode should implement this. |
150 | |
151 | virtual void get(ExceptionOrValue& output) noexcept = 0; |
152 | // Get the result. `output` points to an ExceptionOr<T> into which the result will be written. |
153 | // Can only be called once, and only after the node is ready. Must be called directly from the |
154 | // event loop, with no application code on the stack. |
155 | |
156 | virtual PromiseNode* getInnerForTrace(); |
157 | // If this node wraps some other PromiseNode, get the wrapped node. Used for debug tracing. |
158 | // Default implementation returns nullptr. |
159 | |
160 | protected: |
161 | class OnReadyEvent { |
162 | // Helper class for implementing onReady(). |
163 | |
164 | public: |
165 | void init(Event* newEvent); |
166 | |
167 | void arm(); |
168 | // Arms the event if init() has already been called and makes future calls to init() |
169 | // automatically arm the event. |
170 | |
171 | private: |
172 | Event* event = nullptr; |
173 | }; |
174 | }; |
175 | |
176 | // ------------------------------------------------------------------- |
177 | |
178 | class ImmediatePromiseNodeBase: public PromiseNode { |
179 | public: |
180 | ImmediatePromiseNodeBase(); |
181 | ~ImmediatePromiseNodeBase() noexcept(false); |
182 | |
183 | void onReady(Event* event) noexcept override; |
184 | }; |
185 | |
186 | template <typename T> |
187 | class ImmediatePromiseNode final: public ImmediatePromiseNodeBase { |
188 | // A promise that has already been resolved to an immediate value or exception. |
189 | |
190 | public: |
191 | ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {} |
192 | |
193 | void get(ExceptionOrValue& output) noexcept override { |
194 | output.as<T>() = kj::mv(result); |
195 | } |
196 | |
197 | private: |
198 | ExceptionOr<T> result; |
199 | }; |
200 | |
201 | class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase { |
202 | public: |
203 | ImmediateBrokenPromiseNode(Exception&& exception); |
204 | |
205 | void get(ExceptionOrValue& output) noexcept override; |
206 | |
207 | private: |
208 | Exception exception; |
209 | }; |
210 | |
211 | // ------------------------------------------------------------------- |
212 | |
213 | class AttachmentPromiseNodeBase: public PromiseNode { |
214 | public: |
215 | AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency); |
216 | |
217 | void onReady(Event* event) noexcept override; |
218 | void get(ExceptionOrValue& output) noexcept override; |
219 | PromiseNode* getInnerForTrace() override; |
220 | |
221 | private: |
222 | Own<PromiseNode> dependency; |
223 | |
224 | void dropDependency(); |
225 | |
226 | template <typename> |
227 | friend class AttachmentPromiseNode; |
228 | }; |
229 | |
230 | template <typename Attachment> |
231 | class AttachmentPromiseNode final: public AttachmentPromiseNodeBase { |
232 | // A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable |
233 | // object) until the promise resolves. |
234 | |
235 | public: |
236 | AttachmentPromiseNode(Own<PromiseNode>&& dependency, Attachment&& attachment) |
237 | : AttachmentPromiseNodeBase(kj::mv(dependency)), |
238 | attachment(kj::mv<Attachment>(attachment)) {} |
239 | |
240 | ~AttachmentPromiseNode() noexcept(false) { |
241 | // We need to make sure the dependency is deleted before we delete the attachment because the |
242 | // dependency may be using the attachment. |
243 | dropDependency(); |
244 | } |
245 | |
246 | private: |
247 | Attachment attachment; |
248 | }; |
249 | |
250 | // ------------------------------------------------------------------- |
251 | |
252 | #if __GNUC__ >= 8 && !__clang__ |
253 | // GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no |
254 | // "legal" way for us to extract the contetn of a PTMF so too bad. |
255 | #pragma GCC diagnostic push |
256 | #pragma GCC diagnostic ignored "-Wclass-memaccess" |
257 | #endif |
258 | |
259 | class PtmfHelper { |
260 | // This class is a private helper for GetFunctorStartAddress. The class represents the internal |
261 | // representation of a pointer-to-member-function. |
262 | |
263 | template <typename... ParamTypes> |
264 | friend struct GetFunctorStartAddress; |
265 | |
266 | #if __GNUG__ |
267 | |
268 | void* ptr; |
269 | ptrdiff_t adj; |
270 | // Layout of a pointer-to-member-function used by GCC and compatible compilers. |
271 | |
272 | void* apply(void* obj) { |
273 | #if defined(__arm__) || defined(__mips__) || defined(__aarch64__) |
274 | if (adj & 1) { |
275 | ptrdiff_t voff = (ptrdiff_t)ptr; |
276 | #else |
277 | ptrdiff_t voff = (ptrdiff_t)ptr; |
278 | if (voff & 1) { |
279 | voff &= ~1; |
280 | #endif |
281 | return *(void**)(*(char**)obj + voff); |
282 | } else { |
283 | return ptr; |
284 | } |
285 | } |
286 | |
287 | #define BODY \ |
288 | PtmfHelper result; \ |
289 | static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \ |
290 | memcpy(&result, &p, sizeof(result)); \ |
291 | return result |
292 | |
293 | #else // __GNUG__ |
294 | |
295 | void* apply(void* obj) { return nullptr; } |
296 | // TODO(port): PTMF instruction address extraction |
297 | |
298 | #define BODY return PtmfHelper{} |
299 | |
300 | #endif // __GNUG__, else |
301 | |
302 | template <typename R, typename C, typename... P, typename F> |
303 | static PtmfHelper from(F p) { BODY; } |
304 | // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not |
305 | // overloaded nor a template. In this case the compiler is able to deduce the full function |
306 | // signature directly given the name since there is only one function with that name. |
307 | |
308 | template <typename R, typename C, typename... P> |
309 | static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; } |
310 | template <typename R, typename C, typename... P> |
311 | static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; } |
312 | // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case |
313 | // the function must match exactly the containing type C, return type R, and parameter types P... |
314 | // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a |
315 | // guess at P. Luckily, if the function parameters are template parameters then it's not |
316 | // necessary to be precise about P. |
317 | #undef BODY |
318 | }; |
319 | |
320 | #if __GNUC__ >= 8 && !__clang__ |
321 | #pragma GCC diagnostic pop |
322 | #endif |
323 | |
324 | template <typename... ParamTypes> |
325 | struct GetFunctorStartAddress { |
326 | // Given a functor (any object defining operator()), return the start address of the function, |
327 | // suitable for passing to addr2line to obtain a source file/line for debugging purposes. |
328 | // |
329 | // This turns out to be incredibly hard to implement in the presence of overloaded or templated |
330 | // functors. Therefore, we impose these specific restrictions, specific to our use case: |
331 | // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas |
332 | // anyway.) |
333 | // - The template parameters to GetFunctorStartAddress specify a hint as to the expected |
334 | // parameter types. If the functor is templated, its parameters must match exactly these types. |
335 | // (If it's not templated, ParamTypes are ignored.) |
336 | |
337 | template <typename Func> |
338 | static void* apply(Func&& func) { |
339 | typedef decltype(func(instance<ParamTypes>()...)) ReturnType; |
340 | return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>( |
341 | &Decay<Func>::operator()).apply(&func); |
342 | } |
343 | }; |
344 | |
345 | template <> |
346 | struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {}; |
347 | // Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function |
348 | // actually has no parameters. |
349 | |
350 | class TransformPromiseNodeBase: public PromiseNode { |
351 | public: |
352 | TransformPromiseNodeBase(Own<PromiseNode>&& dependency, void* continuationTracePtr); |
353 | |
354 | void onReady(Event* event) noexcept override; |
355 | void get(ExceptionOrValue& output) noexcept override; |
356 | PromiseNode* getInnerForTrace() override; |
357 | |
358 | private: |
359 | Own<PromiseNode> dependency; |
360 | void* continuationTracePtr; |
361 | |
362 | void dropDependency(); |
363 | void getDepResult(ExceptionOrValue& output); |
364 | |
365 | virtual void getImpl(ExceptionOrValue& output) = 0; |
366 | |
367 | template <typename, typename, typename, typename> |
368 | friend class TransformPromiseNode; |
369 | }; |
370 | |
371 | template <typename T, typename DepT, typename Func, typename ErrorFunc> |
372 | class TransformPromiseNode final: public TransformPromiseNodeBase { |
373 | // A PromiseNode that transforms the result of another PromiseNode through an application-provided |
374 | // function (implements `then()`). |
375 | |
376 | public: |
377 | TransformPromiseNode(Own<PromiseNode>&& dependency, Func&& func, ErrorFunc&& errorHandler) |
378 | : TransformPromiseNodeBase(kj::mv(dependency), |
379 | GetFunctorStartAddress<DepT&&>::apply(func)), |
380 | func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {} |
381 | |
382 | ~TransformPromiseNode() noexcept(false) { |
383 | // We need to make sure the dependency is deleted before we delete the continuations because it |
384 | // is a common pattern for the continuations to hold ownership of objects that might be in-use |
385 | // by the dependency. |
386 | dropDependency(); |
387 | } |
388 | |
389 | private: |
390 | Func func; |
391 | ErrorFunc errorHandler; |
392 | |
393 | void getImpl(ExceptionOrValue& output) override { |
394 | ExceptionOr<DepT> depResult; |
395 | getDepResult(depResult); |
396 | KJ_IF_MAYBE(depException, depResult.exception) { |
397 | output.as<T>() = handle( |
398 | MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply( |
399 | errorHandler, kj::mv(*depException))); |
400 | } else KJ_IF_MAYBE(depValue, depResult.value) { |
401 | output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue))); |
402 | } |
403 | } |
404 | |
405 | ExceptionOr<T> handle(T&& value) { |
406 | return kj::mv(value); |
407 | } |
408 | ExceptionOr<T> handle(PropagateException::Bottom&& value) { |
409 | return ExceptionOr<T>(false, value.asException()); |
410 | } |
411 | }; |
412 | |
413 | // ------------------------------------------------------------------- |
414 | |
415 | class ForkHubBase; |
416 | |
417 | class ForkBranchBase: public PromiseNode { |
418 | public: |
419 | ForkBranchBase(Own<ForkHubBase>&& hub); |
420 | ~ForkBranchBase() noexcept(false); |
421 | |
422 | void hubReady() noexcept; |
423 | // Called by the hub to indicate that it is ready. |
424 | |
425 | // implements PromiseNode ------------------------------------------ |
426 | void onReady(Event* event) noexcept override; |
427 | PromiseNode* getInnerForTrace() override; |
428 | |
429 | protected: |
430 | inline ExceptionOrValue& getHubResultRef(); |
431 | |
432 | void releaseHub(ExceptionOrValue& output); |
433 | // Release the hub. If an exception is thrown, add it to `output`. |
434 | |
435 | private: |
436 | OnReadyEvent onReadyEvent; |
437 | |
438 | Own<ForkHubBase> hub; |
439 | ForkBranchBase* next = nullptr; |
440 | ForkBranchBase** prevPtr = nullptr; |
441 | |
442 | friend class ForkHubBase; |
443 | }; |
444 | |
445 | template <typename T> T copyOrAddRef(T& t) { return t; } |
446 | template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); } |
447 | |
448 | template <typename T> |
449 | class ForkBranch final: public ForkBranchBase { |
450 | // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives |
451 | // a const reference. |
452 | |
453 | public: |
454 | ForkBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {} |
455 | |
456 | void get(ExceptionOrValue& output) noexcept override { |
457 | ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); |
458 | KJ_IF_MAYBE(value, hubResult.value) { |
459 | output.as<T>().value = copyOrAddRef(*value); |
460 | } else { |
461 | output.as<T>().value = nullptr; |
462 | } |
463 | output.exception = hubResult.exception; |
464 | releaseHub(output); |
465 | } |
466 | }; |
467 | |
468 | template <typename T, size_t index> |
469 | class SplitBranch final: public ForkBranchBase { |
470 | // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives |
471 | // a const reference. |
472 | |
473 | public: |
474 | SplitBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {} |
475 | |
476 | typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element; |
477 | |
478 | void get(ExceptionOrValue& output) noexcept override { |
479 | ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); |
480 | KJ_IF_MAYBE(value, hubResult.value) { |
481 | output.as<Element>().value = kj::mv(kj::get<index>(*value)); |
482 | } else { |
483 | output.as<Element>().value = nullptr; |
484 | } |
485 | output.exception = hubResult.exception; |
486 | releaseHub(output); |
487 | } |
488 | }; |
489 | |
490 | // ------------------------------------------------------------------- |
491 | |
492 | class ForkHubBase: public Refcounted, protected Event { |
493 | public: |
494 | ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef); |
495 | |
496 | inline ExceptionOrValue& getResultRef() { return resultRef; } |
497 | |
498 | private: |
499 | Own<PromiseNode> inner; |
500 | ExceptionOrValue& resultRef; |
501 | |
502 | ForkBranchBase* headBranch = nullptr; |
503 | ForkBranchBase** tailBranch = &headBranch; |
504 | // Tail becomes null once the inner promise is ready and all branches have been notified. |
505 | |
506 | Maybe<Own<Event>> fire() override; |
507 | _::PromiseNode* getInnerForTrace() override; |
508 | |
509 | friend class ForkBranchBase; |
510 | }; |
511 | |
512 | template <typename T> |
513 | class ForkHub final: public ForkHubBase { |
514 | // A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces |
515 | // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if |
516 | // possible). |
517 | |
518 | public: |
519 | ForkHub(Own<PromiseNode>&& inner): ForkHubBase(kj::mv(inner), result) {} |
520 | |
521 | Promise<_::UnfixVoid<T>> addBranch() { |
522 | return Promise<_::UnfixVoid<T>>(false, kj::heap<ForkBranch<T>>(addRef(*this))); |
523 | } |
524 | |
525 | _::SplitTuplePromise<T> split() { |
526 | return splitImpl(MakeIndexes<tupleSize<T>()>()); |
527 | } |
528 | |
529 | private: |
530 | ExceptionOr<T> result; |
531 | |
532 | template <size_t... indexes> |
533 | _::SplitTuplePromise<T> splitImpl(Indexes<indexes...>) { |
534 | return kj::tuple(addSplit<indexes>()...); |
535 | } |
536 | |
537 | template <size_t index> |
538 | ReducePromises<typename SplitBranch<T, index>::Element> addSplit() { |
539 | return ReducePromises<typename SplitBranch<T, index>::Element>( |
540 | false, maybeChain(kj::heap<SplitBranch<T, index>>(addRef(*this)), |
541 | implicitCast<typename SplitBranch<T, index>::Element*>(nullptr))); |
542 | } |
543 | }; |
544 | |
545 | inline ExceptionOrValue& ForkBranchBase::getHubResultRef() { |
546 | return hub->getResultRef(); |
547 | } |
548 | |
549 | // ------------------------------------------------------------------- |
550 | |
551 | class ChainPromiseNode final: public PromiseNode, public Event { |
552 | // Promise node which reduces Promise<Promise<T>> to Promise<T>. |
553 | // |
554 | // `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to |
555 | // Own<Event>. Ugh, templates and private... |
556 | |
557 | public: |
558 | explicit ChainPromiseNode(Own<PromiseNode> inner); |
559 | ~ChainPromiseNode() noexcept(false); |
560 | |
561 | void onReady(Event* event) noexcept override; |
562 | void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept override; |
563 | void get(ExceptionOrValue& output) noexcept override; |
564 | PromiseNode* getInnerForTrace() override; |
565 | |
566 | private: |
567 | enum State { |
568 | STEP1, |
569 | STEP2 |
570 | }; |
571 | |
572 | State state; |
573 | |
574 | Own<PromiseNode> inner; |
575 | // In STEP1, a PromiseNode for a Promise<T>. |
576 | // In STEP2, a PromiseNode for a T. |
577 | |
578 | Event* onReadyEvent = nullptr; |
579 | Own<PromiseNode>* selfPtr = nullptr; |
580 | |
581 | Maybe<Own<Event>> fire() override; |
582 | }; |
583 | |
584 | template <typename T> |
585 | Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, Promise<T>*) { |
586 | return heap<ChainPromiseNode>(kj::mv(node)); |
587 | } |
588 | |
589 | template <typename T> |
590 | Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, T*) { |
591 | return kj::mv(node); |
592 | } |
593 | |
594 | template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))> |
595 | inline Result maybeReduce(Promise<T>&& promise, bool) { |
596 | return T::reducePromise(kj::mv(promise)); |
597 | } |
598 | |
599 | template <typename T> |
600 | inline Promise<T> maybeReduce(Promise<T>&& promise, ...) { |
601 | return kj::mv(promise); |
602 | } |
603 | |
604 | // ------------------------------------------------------------------- |
605 | |
606 | class ExclusiveJoinPromiseNode final: public PromiseNode { |
607 | public: |
608 | ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right); |
609 | ~ExclusiveJoinPromiseNode() noexcept(false); |
610 | |
611 | void onReady(Event* event) noexcept override; |
612 | void get(ExceptionOrValue& output) noexcept override; |
613 | PromiseNode* getInnerForTrace() override; |
614 | |
615 | private: |
616 | class Branch: public Event { |
617 | public: |
618 | Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency); |
619 | ~Branch() noexcept(false); |
620 | |
621 | bool get(ExceptionOrValue& output); |
622 | // Returns true if this is the side that finished. |
623 | |
624 | Maybe<Own<Event>> fire() override; |
625 | _::PromiseNode* getInnerForTrace() override; |
626 | |
627 | private: |
628 | ExclusiveJoinPromiseNode& joinNode; |
629 | Own<PromiseNode> dependency; |
630 | }; |
631 | |
632 | Branch left; |
633 | Branch right; |
634 | OnReadyEvent onReadyEvent; |
635 | }; |
636 | |
637 | // ------------------------------------------------------------------- |
638 | |
639 | class ArrayJoinPromiseNodeBase: public PromiseNode { |
640 | public: |
641 | ArrayJoinPromiseNodeBase(Array<Own<PromiseNode>> promises, |
642 | ExceptionOrValue* resultParts, size_t partSize); |
643 | ~ArrayJoinPromiseNodeBase() noexcept(false); |
644 | |
645 | void onReady(Event* event) noexcept override final; |
646 | void get(ExceptionOrValue& output) noexcept override final; |
647 | PromiseNode* getInnerForTrace() override final; |
648 | |
649 | protected: |
650 | virtual void getNoError(ExceptionOrValue& output) noexcept = 0; |
651 | // Called to compile the result only in the case where there were no errors. |
652 | |
653 | private: |
654 | uint countLeft; |
655 | OnReadyEvent onReadyEvent; |
656 | |
657 | class Branch final: public Event { |
658 | public: |
659 | Branch(ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependency, |
660 | ExceptionOrValue& output); |
661 | ~Branch() noexcept(false); |
662 | |
663 | Maybe<Own<Event>> fire() override; |
664 | _::PromiseNode* getInnerForTrace() override; |
665 | |
666 | Maybe<Exception> getPart(); |
667 | // Calls dependency->get(output). If there was an exception, return it. |
668 | |
669 | private: |
670 | ArrayJoinPromiseNodeBase& joinNode; |
671 | Own<PromiseNode> dependency; |
672 | ExceptionOrValue& output; |
673 | }; |
674 | |
675 | Array<Branch> branches; |
676 | }; |
677 | |
678 | template <typename T> |
679 | class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase { |
680 | public: |
681 | ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises, |
682 | Array<ExceptionOr<T>> resultParts) |
683 | : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>)), |
684 | resultParts(kj::mv(resultParts)) {} |
685 | |
686 | protected: |
687 | void getNoError(ExceptionOrValue& output) noexcept override { |
688 | auto builder = heapArrayBuilder<T>(resultParts.size()); |
689 | for (auto& part: resultParts) { |
690 | KJ_IASSERT(part.value != nullptr, |
691 | "Bug in KJ promise framework: Promise result had neither value no exception." ); |
692 | builder.add(kj::mv(*_::readMaybe(part.value))); |
693 | } |
694 | output.as<Array<T>>() = builder.finish(); |
695 | } |
696 | |
697 | private: |
698 | Array<ExceptionOr<T>> resultParts; |
699 | }; |
700 | |
701 | template <> |
702 | class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase { |
703 | public: |
704 | ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises, |
705 | Array<ExceptionOr<_::Void>> resultParts); |
706 | ~ArrayJoinPromiseNode(); |
707 | |
708 | protected: |
709 | void getNoError(ExceptionOrValue& output) noexcept override; |
710 | |
711 | private: |
712 | Array<ExceptionOr<_::Void>> resultParts; |
713 | }; |
714 | |
715 | // ------------------------------------------------------------------- |
716 | |
717 | class EagerPromiseNodeBase: public PromiseNode, protected Event { |
718 | // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly |
719 | // evaluate it. |
720 | |
721 | public: |
722 | EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef); |
723 | |
724 | void onReady(Event* event) noexcept override; |
725 | PromiseNode* getInnerForTrace() override; |
726 | |
727 | private: |
728 | Own<PromiseNode> dependency; |
729 | OnReadyEvent onReadyEvent; |
730 | |
731 | ExceptionOrValue& resultRef; |
732 | |
733 | Maybe<Own<Event>> fire() override; |
734 | }; |
735 | |
736 | template <typename T> |
737 | class EagerPromiseNode final: public EagerPromiseNodeBase { |
738 | public: |
739 | EagerPromiseNode(Own<PromiseNode>&& dependency) |
740 | : EagerPromiseNodeBase(kj::mv(dependency), result) {} |
741 | |
742 | void get(ExceptionOrValue& output) noexcept override { |
743 | output.as<T>() = kj::mv(result); |
744 | } |
745 | |
746 | private: |
747 | ExceptionOr<T> result; |
748 | }; |
749 | |
750 | template <typename T> |
751 | Own<PromiseNode> spark(Own<PromiseNode>&& node) { |
752 | // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting |
753 | // on it. |
754 | return heap<EagerPromiseNode<T>>(kj::mv(node)); |
755 | } |
756 | |
757 | // ------------------------------------------------------------------- |
758 | |
759 | class AdapterPromiseNodeBase: public PromiseNode { |
760 | public: |
761 | void onReady(Event* event) noexcept override; |
762 | |
763 | protected: |
764 | inline void setReady() { |
765 | onReadyEvent.arm(); |
766 | } |
767 | |
768 | private: |
769 | OnReadyEvent onReadyEvent; |
770 | }; |
771 | |
772 | template <typename T, typename Adapter> |
773 | class AdapterPromiseNode final: public AdapterPromiseNodeBase, |
774 | private PromiseFulfiller<UnfixVoid<T>> { |
775 | // A PromiseNode that wraps a PromiseAdapter. |
776 | |
777 | public: |
778 | template <typename... Params> |
779 | AdapterPromiseNode(Params&&... params) |
780 | : adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {} |
781 | |
782 | void get(ExceptionOrValue& output) noexcept override { |
783 | KJ_IREQUIRE(!isWaiting()); |
784 | output.as<T>() = kj::mv(result); |
785 | } |
786 | |
787 | private: |
788 | ExceptionOr<T> result; |
789 | bool waiting = true; |
790 | Adapter adapter; |
791 | |
792 | void fulfill(T&& value) override { |
793 | if (waiting) { |
794 | waiting = false; |
795 | result = ExceptionOr<T>(kj::mv(value)); |
796 | setReady(); |
797 | } |
798 | } |
799 | |
800 | void reject(Exception&& exception) override { |
801 | if (waiting) { |
802 | waiting = false; |
803 | result = ExceptionOr<T>(false, kj::mv(exception)); |
804 | setReady(); |
805 | } |
806 | } |
807 | |
808 | bool isWaiting() override { |
809 | return waiting; |
810 | } |
811 | }; |
812 | |
813 | } // namespace _ (private) |
814 | |
815 | // ======================================================================================= |
816 | |
817 | template <typename T> |
818 | Promise<T>::Promise(_::FixVoid<T> value) |
819 | : PromiseBase(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {} |
820 | |
821 | template <typename T> |
822 | Promise<T>::Promise(kj::Exception&& exception) |
823 | : PromiseBase(heap<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {} |
824 | |
825 | template <typename T> |
826 | template <typename Func, typename ErrorFunc> |
827 | PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) { |
828 | typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; |
829 | |
830 | Own<_::PromiseNode> intermediate = |
831 | heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>( |
832 | kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)); |
833 | auto result = _::ChainPromises<_::ReturnType<Func, T>>(false, |
834 | _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr))); |
835 | return _::maybeReduce(kj::mv(result), false); |
836 | } |
837 | |
838 | namespace _ { // private |
839 | |
840 | template <typename T> |
841 | struct IdentityFunc { |
842 | inline T operator()(T&& value) const { |
843 | return kj::mv(value); |
844 | } |
845 | }; |
846 | template <typename T> |
847 | struct IdentityFunc<Promise<T>> { |
848 | inline Promise<T> operator()(T&& value) const { |
849 | return kj::mv(value); |
850 | } |
851 | }; |
852 | template <> |
853 | struct IdentityFunc<void> { |
854 | inline void operator()() const {} |
855 | }; |
856 | template <> |
857 | struct IdentityFunc<Promise<void>> { |
858 | Promise<void> operator()() const; |
859 | // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly, |
860 | // Cap'n Proto relies on being able to include this header without creating such a link-time |
861 | // dependency. |
862 | }; |
863 | |
864 | } // namespace _ (private) |
865 | |
866 | template <typename T> |
867 | template <typename ErrorFunc> |
868 | Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler) { |
869 | // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case, |
870 | // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise, |
871 | // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually |
872 | // return a promise. So we make our Func return match ErrorFunc. |
873 | return then(_::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(), |
874 | kj::fwd<ErrorFunc>(errorHandler)); |
875 | } |
876 | |
877 | template <typename T> |
878 | T Promise<T>::wait(WaitScope& waitScope) { |
879 | _::ExceptionOr<_::FixVoid<T>> result; |
880 | |
881 | _::waitImpl(kj::mv(node), result, waitScope); |
882 | |
883 | KJ_IF_MAYBE(value, result.value) { |
884 | KJ_IF_MAYBE(exception, result.exception) { |
885 | throwRecoverableException(kj::mv(*exception)); |
886 | } |
887 | return _::returnMaybeVoid(kj::mv(*value)); |
888 | } else KJ_IF_MAYBE(exception, result.exception) { |
889 | throwFatalException(kj::mv(*exception)); |
890 | } else { |
891 | // Result contained neither a value nor an exception? |
892 | KJ_UNREACHABLE; |
893 | } |
894 | } |
895 | |
896 | template <> |
897 | inline void Promise<void>::wait(WaitScope& waitScope) { |
898 | // Override <void> case to use throwRecoverableException(). |
899 | |
900 | _::ExceptionOr<_::Void> result; |
901 | |
902 | _::waitImpl(kj::mv(node), result, waitScope); |
903 | |
904 | if (result.value != nullptr) { |
905 | KJ_IF_MAYBE(exception, result.exception) { |
906 | throwRecoverableException(kj::mv(*exception)); |
907 | } |
908 | } else KJ_IF_MAYBE(exception, result.exception) { |
909 | throwRecoverableException(kj::mv(*exception)); |
910 | } else { |
911 | // Result contained neither a value nor an exception? |
912 | KJ_UNREACHABLE; |
913 | } |
914 | } |
915 | |
916 | template <typename T> |
917 | bool Promise<T>::poll(WaitScope& waitScope) { |
918 | return _::pollImpl(*node, waitScope); |
919 | } |
920 | |
921 | template <typename T> |
922 | ForkedPromise<T> Promise<T>::fork() { |
923 | return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))); |
924 | } |
925 | |
926 | template <typename T> |
927 | Promise<T> ForkedPromise<T>::addBranch() { |
928 | return hub->addBranch(); |
929 | } |
930 | |
931 | template <typename T> |
932 | _::SplitTuplePromise<T> Promise<T>::split() { |
933 | return refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))->split(); |
934 | } |
935 | |
936 | template <typename T> |
937 | Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other) { |
938 | return Promise(false, heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node))); |
939 | } |
940 | |
941 | template <typename T> |
942 | template <typename... Attachments> |
943 | Promise<T> Promise<T>::attach(Attachments&&... attachments) { |
944 | return Promise(false, kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>( |
945 | kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...))); |
946 | } |
947 | |
948 | template <typename T> |
949 | template <typename ErrorFunc> |
950 | Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler) { |
951 | // See catch_() for commentary. |
952 | return Promise(false, _::spark<_::FixVoid<T>>(then( |
953 | _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(), |
954 | kj::fwd<ErrorFunc>(errorHandler)).node)); |
955 | } |
956 | |
957 | template <typename T> |
958 | Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr)) { |
959 | return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node))); |
960 | } |
961 | |
962 | template <typename T> |
963 | kj::String Promise<T>::trace() { |
964 | return PromiseBase::trace(); |
965 | } |
966 | |
967 | template <typename Func> |
968 | inline PromiseForResult<Func, void> evalLater(Func&& func) { |
969 | return _::yield().then(kj::fwd<Func>(func), _::PropagateException()); |
970 | } |
971 | |
972 | template <typename Func> |
973 | inline PromiseForResult<Func, void> evalNow(Func&& func) { |
974 | PromiseForResult<Func, void> result = nullptr; |
975 | KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() { |
976 | result = func(); |
977 | })) { |
978 | result = kj::mv(*e); |
979 | } |
980 | return result; |
981 | } |
982 | |
983 | template <typename T> |
984 | template <typename ErrorFunc> |
985 | void Promise<T>::detach(ErrorFunc&& errorHandler) { |
986 | return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler))); |
987 | } |
988 | |
989 | template <> |
990 | template <typename ErrorFunc> |
991 | void Promise<void>::detach(ErrorFunc&& errorHandler) { |
992 | return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler))); |
993 | } |
994 | |
995 | template <typename T> |
996 | Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises) { |
997 | return Promise<Array<T>>(false, kj::heap<_::ArrayJoinPromiseNode<T>>( |
998 | KJ_MAP(p, promises) { return kj::mv(p.node); }, |
999 | heapArray<_::ExceptionOr<T>>(promises.size()))); |
1000 | } |
1001 | |
1002 | // ======================================================================================= |
1003 | |
1004 | namespace _ { // private |
1005 | |
1006 | template <typename T> |
1007 | class WeakFulfiller final: public PromiseFulfiller<T>, private kj::Disposer { |
1008 | // A wrapper around PromiseFulfiller which can be detached. |
1009 | // |
1010 | // There are a couple non-trivialities here: |
1011 | // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly |
1012 | // rejected. |
1013 | // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been |
1014 | // detached from the underlying fulfiller, because otherwise the later detach() call will go |
1015 | // to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the |
1016 | // refcount never goes over 2 and we manually implement the refcounting because we need to do |
1017 | // other special things when each side detaches anyway. To this end, WeakFulfiller is its own |
1018 | // Disposer -- dispose() is called when the application discards its owned pointer to the |
1019 | // fulfiller and detach() is called when the promise is destroyed. |
1020 | |
1021 | public: |
1022 | KJ_DISALLOW_COPY(WeakFulfiller); |
1023 | |
1024 | static kj::Own<WeakFulfiller> make() { |
1025 | WeakFulfiller* ptr = new WeakFulfiller; |
1026 | return Own<WeakFulfiller>(ptr, *ptr); |
1027 | } |
1028 | |
1029 | void fulfill(FixVoid<T>&& value) override { |
1030 | if (inner != nullptr) { |
1031 | inner->fulfill(kj::mv(value)); |
1032 | } |
1033 | } |
1034 | |
1035 | void reject(Exception&& exception) override { |
1036 | if (inner != nullptr) { |
1037 | inner->reject(kj::mv(exception)); |
1038 | } |
1039 | } |
1040 | |
1041 | bool isWaiting() override { |
1042 | return inner != nullptr && inner->isWaiting(); |
1043 | } |
1044 | |
1045 | void attach(PromiseFulfiller<T>& newInner) { |
1046 | inner = &newInner; |
1047 | } |
1048 | |
1049 | void detach(PromiseFulfiller<T>& from) { |
1050 | if (inner == nullptr) { |
1051 | // Already disposed. |
1052 | delete this; |
1053 | } else { |
1054 | KJ_IREQUIRE(inner == &from); |
1055 | inner = nullptr; |
1056 | } |
1057 | } |
1058 | |
1059 | private: |
1060 | mutable PromiseFulfiller<T>* inner; |
1061 | |
1062 | WeakFulfiller(): inner(nullptr) {} |
1063 | |
1064 | void disposeImpl(void* pointer) const override { |
1065 | // TODO(perf): Factor some of this out so it isn't regenerated for every fulfiller type? |
1066 | |
1067 | if (inner == nullptr) { |
1068 | // Already detached. |
1069 | delete this; |
1070 | } else { |
1071 | if (inner->isWaiting()) { |
1072 | inner->reject(kj::Exception(kj::Exception::Type::FAILED, __FILE__, __LINE__, |
1073 | kj::heapString("PromiseFulfiller was destroyed without fulfilling the promise." ))); |
1074 | } |
1075 | inner = nullptr; |
1076 | } |
1077 | } |
1078 | }; |
1079 | |
1080 | template <typename T> |
1081 | class PromiseAndFulfillerAdapter { |
1082 | public: |
1083 | PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller, |
1084 | WeakFulfiller<T>& wrapper) |
1085 | : fulfiller(fulfiller), wrapper(wrapper) { |
1086 | wrapper.attach(fulfiller); |
1087 | } |
1088 | |
1089 | ~PromiseAndFulfillerAdapter() noexcept(false) { |
1090 | wrapper.detach(fulfiller); |
1091 | } |
1092 | |
1093 | private: |
1094 | PromiseFulfiller<T>& fulfiller; |
1095 | WeakFulfiller<T>& wrapper; |
1096 | }; |
1097 | |
1098 | } // namespace _ (private) |
1099 | |
1100 | template <typename T> |
1101 | template <typename Func> |
1102 | bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) { |
1103 | KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { |
1104 | reject(kj::mv(*exception)); |
1105 | return false; |
1106 | } else { |
1107 | return true; |
1108 | } |
1109 | } |
1110 | |
1111 | template <typename Func> |
1112 | bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) { |
1113 | KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { |
1114 | reject(kj::mv(*exception)); |
1115 | return false; |
1116 | } else { |
1117 | return true; |
1118 | } |
1119 | } |
1120 | |
1121 | template <typename T, typename Adapter, typename... Params> |
1122 | Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams) { |
1123 | return Promise<T>(false, heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>( |
1124 | kj::fwd<Params>(adapterConstructorParams)...)); |
1125 | } |
1126 | |
1127 | template <typename T> |
1128 | PromiseFulfillerPair<T> newPromiseAndFulfiller() { |
1129 | auto wrapper = _::WeakFulfiller<T>::make(); |
1130 | |
1131 | Own<_::PromiseNode> intermediate( |
1132 | heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper)); |
1133 | _::ReducePromises<T> promise(false, |
1134 | _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr))); |
1135 | |
1136 | return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) }; |
1137 | } |
1138 | |
1139 | } // namespace kj |
1140 | |