1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#pragma once
17
18#include <algorithm>
19#include <cassert>
20#include <chrono>
21#include <thread>
22
23#include <folly/Optional.h>
24#include <folly/executors/InlineExecutor.h>
25#include <folly/executors/QueuedImmediateExecutor.h>
26#include <folly/futures/detail/Core.h>
27#include <folly/synchronization/Baton.h>
28
29#ifndef FOLLY_FUTURE_USING_FIBER
30#if FOLLY_MOBILE || defined(__APPLE__)
31#define FOLLY_FUTURE_USING_FIBER 0
32#else
33#define FOLLY_FUTURE_USING_FIBER 1
34#include <folly/fibers/Baton.h>
35#endif
36#endif
37
38namespace folly {
39
40class Timekeeper;
41
42namespace futures {
43namespace detail {
44#if FOLLY_FUTURE_USING_FIBER
45typedef folly::fibers::Baton FutureBatonType;
46#else
47typedef folly::Baton<> FutureBatonType;
48#endif
49} // namespace detail
50} // namespace futures
51
52namespace detail {
53std::shared_ptr<Timekeeper> getTimekeeperSingleton();
54} // namespace detail
55
56namespace futures {
57namespace detail {
58// Guarantees that the stored functor is destructed before the stored promise
59// may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60template <typename T, typename F>
61class CoreCallbackState {
62 using DF = std::decay_t<F>;
63
64 public:
65 CoreCallbackState(Promise<T>&& promise, F&& func) noexcept(
66 noexcept(DF(std::declval<F&&>())))
67 : func_(std::forward<F>(func)), promise_(std::move(promise)) {
68 assert(before_barrier());
69 }
70
71 CoreCallbackState(CoreCallbackState&& that) noexcept(
72 noexcept(DF(std::declval<F&&>()))) {
73 if (that.before_barrier()) {
74 new (&func_) DF(std::forward<F>(that.func_));
75 promise_ = that.stealPromise();
76 }
77 }
78
79 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80
81 ~CoreCallbackState() {
82 if (before_barrier()) {
83 stealPromise();
84 }
85 }
86
87 template <typename... Args>
88 auto invoke(Args&&... args) noexcept(
89 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
90 assert(before_barrier());
91 return std::forward<F>(func_)(std::forward<Args>(args)...);
92 }
93
94 template <typename... Args>
95 auto tryInvoke(Args&&... args) noexcept {
96 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
97 }
98
99 void setTry(Try<T>&& t) {
100 stealPromise().setTry(std::move(t));
101 }
102
103 void setException(exception_wrapper&& ew) {
104 stealPromise().setException(std::move(ew));
105 }
106
107 Promise<T> stealPromise() noexcept {
108 assert(before_barrier());
109 func_.~DF();
110 return std::move(promise_);
111 }
112
113 private:
114 bool before_barrier() const noexcept {
115 return !promise_.isFulfilled();
116 }
117
118 union {
119 DF func_;
120 };
121 Promise<T> promise_{Promise<T>::makeEmpty()};
122};
123
124template <typename T, typename F>
125auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
126 noexcept(CoreCallbackState<T, F>(
127 std::declval<Promise<T>&&>(),
128 std::declval<F&&>()))) {
129 return CoreCallbackState<T, F>(std::move(p), std::forward<F>(f));
130}
131
132template <typename T, typename R, typename... Args>
133auto makeCoreCallbackState(Promise<T>&& p, R (&f)(Args...)) noexcept {
134 return CoreCallbackState<T, R (*)(Args...)>(std::move(p), &f);
135}
136
137template <class T>
138FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
139 other.core_ = nullptr;
140}
141
142template <class T>
143FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
144 other.core_ = nullptr;
145}
146
147template <class T>
148template <class T2, typename>
149FutureBase<T>::FutureBase(T2&& val)
150 : core_(Core::make(Try<T>(std::forward<T2>(val)))) {}
151
152template <class T>
153template <typename T2>
154FutureBase<T>::FutureBase(
155 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
156 : core_(Core::make(Try<T>(T()))) {}
157
158template <class T>
159template <
160 class... Args,
161 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
162 type>
163FutureBase<T>::FutureBase(in_place_t, Args&&... args)
164 : core_(Core::make(in_place, std::forward<Args>(args)...)) {}
165
166template <class T>
167void FutureBase<T>::assign(FutureBase<T>&& other) noexcept {
168 detach();
169 core_ = exchange(other.core_, nullptr);
170}
171
172template <class T>
173FutureBase<T>::~FutureBase() {
174 detach();
175}
176
177template <class T>
178T& FutureBase<T>::value() & {
179 return result().value();
180}
181
182template <class T>
183T const& FutureBase<T>::value() const& {
184 return result().value();
185}
186
187template <class T>
188T&& FutureBase<T>::value() && {
189 return std::move(result().value());
190}
191
192template <class T>
193T const&& FutureBase<T>::value() const&& {
194 return std::move(result().value());
195}
196
197template <class T>
198Try<T>& FutureBase<T>::result() & {
199 return getCoreTryChecked();
200}
201
202template <class T>
203Try<T> const& FutureBase<T>::result() const& {
204 return getCoreTryChecked();
205}
206
207template <class T>
208Try<T>&& FutureBase<T>::result() && {
209 return std::move(getCoreTryChecked());
210}
211
212template <class T>
213Try<T> const&& FutureBase<T>::result() const&& {
214 return std::move(getCoreTryChecked());
215}
216
217template <class T>
218bool FutureBase<T>::isReady() const {
219 return getCore().hasResult();
220}
221
222template <class T>
223bool FutureBase<T>::hasValue() const {
224 return result().hasValue();
225}
226
227template <class T>
228bool FutureBase<T>::hasException() const {
229 return result().hasException();
230}
231
232template <class T>
233void FutureBase<T>::detach() {
234 if (core_) {
235 core_->detachFuture();
236 core_ = nullptr;
237 }
238}
239
240template <class T>
241void FutureBase<T>::throwIfInvalid() const {
242 if (!core_) {
243 throw_exception<FutureInvalid>();
244 }
245}
246
247template <class T>
248void FutureBase<T>::throwIfContinued() const {
249 if (!core_ || core_->hasCallback()) {
250 throw_exception<FutureAlreadyContinued>();
251 }
252}
253
254template <class T>
255Optional<Try<T>> FutureBase<T>::poll() {
256 auto& core = getCore();
257 return core.hasResult() ? Optional<Try<T>>(std::move(core.getTry()))
258 : Optional<Try<T>>();
259}
260
261template <class T>
262void FutureBase<T>::raise(exception_wrapper exception) {
263 getCore().raise(std::move(exception));
264}
265
266template <class T>
267template <class F>
268void FutureBase<T>::setCallback_(F&& func) {
269 throwIfContinued();
270 getCore().setCallback(std::forward<F>(func), RequestContext::saveContext());
271}
272
273template <class T>
274FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
275 : core_(nullptr) {}
276
277// MSVC 2017 Update 7 released with a bug that causes issues expanding to an
278// empty parameter pack when invoking a templated member function. It should
279// be fixed for MSVC 2017 Update 8.
280// TODO: Remove.
281namespace detail_msvc_15_7_workaround {
282template <typename R, std::size_t S>
283using IfArgsSizeIs = std::enable_if_t<R::Arg::ArgsSize::value == S, int>;
284template <typename R, typename State, typename T, IfArgsSizeIs<R, 0> = 0>
285decltype(auto) invoke(R, State& state, Try<T>& /* t */) {
286 return state.invoke();
287}
288template <typename R, typename State, typename T, IfArgsSizeIs<R, 1> = 0>
289decltype(auto) invoke(R, State& state, Try<T>& t) {
290 using Arg0 = typename R::Arg::ArgList::FirstArg;
291 return state.invoke(t.template get<R::Arg::isTry(), Arg0>());
292}
293template <typename R, typename State, typename T, IfArgsSizeIs<R, 0> = 0>
294decltype(auto) tryInvoke(R, State& state, Try<T>& /* t */) {
295 return state.tryInvoke();
296}
297template <typename R, typename State, typename T, IfArgsSizeIs<R, 1> = 0>
298decltype(auto) tryInvoke(R, State& state, Try<T>& t) {
299 using Arg0 = typename R::Arg::ArgList::FirstArg;
300 return state.tryInvoke(t.template get<R::Arg::isTry(), Arg0>());
301}
302} // namespace detail_msvc_15_7_workaround
303
304// then
305
306// Variant: returns a value
307// e.g. f.then([](Try<T>&& t){ return t.value(); });
308template <class T>
309template <typename F, typename R>
310typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
311FutureBase<T>::thenImplementation(F&& func, R) {
312 static_assert(
313 R::Arg::ArgsSize::value <= 1, "Then must take zero/one argument");
314 typedef typename R::ReturnsFuture::Inner B;
315
316 Promise<B> p;
317 p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler());
318
319 // grab the Future now before we lose our handle on the Promise
320 auto sf = p.getSemiFuture();
321 sf.setExecutor(this->getExecutor());
322 auto f = Future<B>(sf.core_);
323 sf.core_ = nullptr;
324
325 /* This is a bit tricky.
326
327 We can't just close over *this in case this Future gets moved. So we
328 make a new dummy Future. We could figure out something more
329 sophisticated that avoids making a new Future object when it can, as an
330 optimization. But this is correct.
331
332 core_ can't be moved, it is explicitly disallowed (as is copying). But
333 if there's ever a reason to allow it, this is one place that makes that
334 assumption and would need to be fixed. We use a standard shared pointer
335 for core_ (by copying it in), which means in essence obj holds a shared
336 pointer to itself. But this shouldn't leak because Promise will not
337 outlive the continuation, because Promise will setException() with a
338 broken Promise if it is destructed before completed. We could use a
339 weak pointer but it would have to be converted to a shared pointer when
340 func is executed (because the Future returned by func may possibly
341 persist beyond the callback, if it gets moved), and so it is an
342 optimization to just make it shared from the get-go.
343
344 Two subtle but important points about this design. futures::detail::Core
345 has no back pointers to Future or Promise, so if Future or Promise get
346 moved (and they will be moved in performant code) we don't have to do
347 anything fancy. And because we store the continuation in the
348 futures::detail::Core, not in the Future, we can execute the continuation
349 even after the Future has gone out of scope. This is an intentional design
350 decision. It is likely we will want to be able to cancel a continuation
351 in some circumstances, but I think it should be explicit not implicit
352 in the destruction of the Future used to create it.
353 */
354 this->setCallback_(
355 [state = futures::detail::makeCoreCallbackState(
356 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
357 if (!R::Arg::isTry() && t.hasException()) {
358 state.setException(std::move(t.exception()));
359 } else {
360 state.setTry(makeTryWith([&] {
361 return detail_msvc_15_7_workaround::invoke(R{}, state, t);
362 }));
363 }
364 });
365 return f;
366}
367
368// Pass through a simple future as it needs no deferral adaptation
369template <class T>
370Future<T> chainExecutor(Executor*, Future<T>&& f) {
371 return std::move(f);
372}
373
374// Correctly chain a SemiFuture for deferral
375template <class T>
376Future<T> chainExecutor(Executor* e, SemiFuture<T>&& f) {
377 if (!e) {
378 e = &InlineExecutor::instance();
379 }
380 return std::move(f).via(e);
381}
382
383// Variant: returns a Future
384// e.g. f.then([](T&& t){ return makeFuture<T>(t); });
385template <class T>
386template <typename F, typename R>
387typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
388FutureBase<T>::thenImplementation(F&& func, R) {
389 static_assert(
390 R::Arg::ArgsSize::value <= 1, "Then must take zero/one argument");
391 typedef typename R::ReturnsFuture::Inner B;
392
393 Promise<B> p;
394 p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler());
395
396 // grab the Future now before we lose our handle on the Promise
397 auto sf = p.getSemiFuture();
398 auto* e = this->getExecutor();
399 sf.setExecutor(e);
400 auto f = Future<B>(sf.core_);
401 sf.core_ = nullptr;
402
403 this->setCallback_(
404 [state = futures::detail::makeCoreCallbackState(
405 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
406 if (!R::Arg::isTry() && t.hasException()) {
407 state.setException(std::move(t.exception()));
408 } else {
409 // Ensure that if function returned a SemiFuture we correctly chain
410 // potential deferral.
411 auto tf2 = detail_msvc_15_7_workaround::tryInvoke(R{}, state, t);
412 if (tf2.hasException()) {
413 state.setException(std::move(tf2.exception()));
414 } else {
415 auto statePromise = state.stealPromise();
416 auto tf3 = chainExecutor(
417 statePromise.core_->getExecutor(), *std::move(tf2));
418 std::exchange(statePromise.core_, nullptr)
419 ->setProxy(std::exchange(tf3.core_, nullptr));
420 }
421 }
422 });
423
424 return f;
425}
426
427template <class T>
428template <typename E>
429SemiFuture<T>
430FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) && {
431 struct Context {
432 explicit Context(E ex) : exception(std::move(ex)) {}
433 E exception;
434 Future<Unit> thisFuture;
435 Promise<T> promise;
436 std::atomic<bool> token{false};
437 };
438
439 std::shared_ptr<Timekeeper> tks;
440 if (LIKELY(!tk)) {
441 tks = folly::detail::getTimekeeperSingleton();
442 tk = tks.get();
443 }
444
445 if (UNLIKELY(!tk)) {
446 return makeSemiFuture<T>(FutureNoTimekeeper());
447 }
448
449 auto ctx = std::make_shared<Context>(std::move(e));
450
451 auto f = [ctx](Try<T>&& t) {
452 if (!ctx->token.exchange(true, std::memory_order_relaxed)) {
453 ctx->promise.setTry(std::move(t));
454 }
455 };
456 using R = futures::detail::callableResult<T, decltype(f)>;
457 ctx->thisFuture = this->thenImplementation(std::move(f), R{});
458
459 // Properly propagate interrupt values through futures chained after within()
460 ctx->promise.setInterruptHandler(
461 [weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) {
462 if (auto lockedCtx = weakCtx.lock()) {
463 lockedCtx->thisFuture.raise(ex);
464 }
465 });
466
467 // Have time keeper use a weak ptr to hold ctx,
468 // so that ctx can be deallocated as soon as the future job finished.
469 tk->after(dur).thenTry([weakCtx = to_weak_ptr(ctx)](Try<Unit>&& t) mutable {
470 auto lockedCtx = weakCtx.lock();
471 if (!lockedCtx) {
472 // ctx already released. "this" completed first, cancel "after"
473 return;
474 }
475 // "after" completed first, cancel "this"
476 lockedCtx->thisFuture.raise(FutureTimeout());
477 if (!lockedCtx->token.exchange(true, std::memory_order_relaxed)) {
478 if (t.hasException()) {
479 lockedCtx->promise.setException(std::move(t.exception()));
480 } else {
481 lockedCtx->promise.setException(std::move(lockedCtx->exception));
482 }
483 }
484 });
485
486 return ctx->promise.getSemiFuture();
487}
488
489/**
490 * Defer work until executor is actively boosted.
491 *
492 * NOTE: that this executor is a private implementation detail belonging to the
493 * Folly Futures library and not intended to be used elsewhere. It is designed
494 * specifically for the use case of deferring work on a SemiFuture. It is NOT
495 * thread safe. Please do not use for any other purpose without great care.
496 */
497class DeferredExecutor final : public Executor {
498 public:
499 void add(Func func) override {
500 auto state = state_.load(std::memory_order_acquire);
501 if (state == State::DETACHED) {
502 return;
503 }
504 if (state == State::HAS_EXECUTOR) {
505 executor_->add(std::move(func));
506 return;
507 }
508 DCHECK(state == State::EMPTY);
509 func_ = std::move(func);
510 if (state_.compare_exchange_strong(
511 state,
512 State::HAS_FUNCTION,
513 std::memory_order_release,
514 std::memory_order_acquire)) {
515 return;
516 }
517 DCHECK(state == State::DETACHED || state == State::HAS_EXECUTOR);
518 if (state == State::DETACHED) {
519 std::exchange(func_, nullptr);
520 return;
521 }
522 executor_->add(std::exchange(func_, nullptr));
523 }
524
525 Executor* getExecutor() const {
526 assert(executor_.get());
527 return executor_.get();
528 }
529
530 void setExecutor(folly::Executor::KeepAlive<> executor) {
531 if (nestedExecutors_) {
532 auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
533 for (auto& nestedExecutor : *nestedExecutors) {
534 nestedExecutor->setExecutor(executor.copy());
535 }
536 }
537 executor_ = std::move(executor);
538 auto state = state_.load(std::memory_order_acquire);
539 if (state == State::EMPTY &&
540 state_.compare_exchange_strong(
541 state,
542 State::HAS_EXECUTOR,
543 std::memory_order_release,
544 std::memory_order_acquire)) {
545 return;
546 }
547
548 DCHECK(state == State::HAS_FUNCTION);
549 state_.store(State::HAS_EXECUTOR, std::memory_order_release);
550 executor_->add(std::exchange(func_, nullptr));
551 }
552
553 void detach() {
554 if (nestedExecutors_) {
555 auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
556 for (auto& nestedExecutor : *nestedExecutors) {
557 nestedExecutor->detach();
558 }
559 }
560 auto state = state_.load(std::memory_order_acquire);
561 if (state == State::EMPTY &&
562 state_.compare_exchange_strong(
563 state,
564 State::DETACHED,
565 std::memory_order_release,
566 std::memory_order_acquire)) {
567 return;
568 }
569
570 DCHECK(state == State::HAS_FUNCTION);
571 state_.store(State::DETACHED, std::memory_order_release);
572 std::exchange(func_, nullptr);
573 }
574
575 void setNestedExecutors(
576 std::vector<folly::Executor::KeepAlive<DeferredExecutor>> executors) {
577 DCHECK(!nestedExecutors_);
578 nestedExecutors_ = std::make_unique<
579 std::vector<folly::Executor::KeepAlive<DeferredExecutor>>>(
580 std::move(executors));
581 }
582
583 static KeepAlive<DeferredExecutor> create() {
584 return makeKeepAlive<DeferredExecutor>(new DeferredExecutor());
585 }
586
587 private:
588 DeferredExecutor() {}
589
590 bool keepAliveAcquire() override {
591 auto keepAliveCount =
592 keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
593 DCHECK(keepAliveCount > 0);
594 return true;
595 }
596
597 void keepAliveRelease() override {
598 auto keepAliveCount =
599 keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
600 DCHECK(keepAliveCount > 0);
601 if (keepAliveCount == 1) {
602 delete this;
603 }
604 }
605
606 enum class State { EMPTY, HAS_FUNCTION, HAS_EXECUTOR, DETACHED };
607 std::atomic<State> state_{State::EMPTY};
608 Func func_;
609 folly::Executor::KeepAlive<> executor_;
610 std::unique_ptr<std::vector<folly::Executor::KeepAlive<DeferredExecutor>>>
611 nestedExecutors_;
612 std::atomic<ssize_t> keepAliveCount_{1};
613};
614
615class WaitExecutor final : public folly::Executor {
616 public:
617 void add(Func func) override {
618 auto wQueue = queue_.wlock();
619 if (wQueue->detached) {
620 return;
621 }
622 bool empty = wQueue->funcs.empty();
623 wQueue->funcs.push_back(std::move(func));
624 if (empty) {
625 baton_.post();
626 }
627 }
628
629 void drive() {
630 baton_.wait();
631 baton_.reset();
632 auto funcs = std::move(queue_.wlock()->funcs);
633 for (auto& func : funcs) {
634 std::exchange(func, nullptr)();
635 }
636 }
637
638 using Clock = std::chrono::steady_clock;
639
640 bool driveUntil(Clock::time_point deadline) {
641 if (!baton_.try_wait_until(deadline)) {
642 return false;
643 }
644 baton_.reset();
645 auto funcs = std::move(queue_.wlock()->funcs);
646 for (auto& func : funcs) {
647 std::exchange(func, nullptr)();
648 }
649 return true;
650 }
651
652 void detach() {
653 // Make sure we don't hold the lock while destroying funcs.
654 [&] {
655 auto wQueue = queue_.wlock();
656 wQueue->detached = true;
657 return std::move(wQueue->funcs);
658 }();
659 }
660
661 static KeepAlive<WaitExecutor> create() {
662 return makeKeepAlive<WaitExecutor>(new WaitExecutor());
663 }
664
665 private:
666 WaitExecutor() {}
667
668 bool keepAliveAcquire() override {
669 auto keepAliveCount =
670 keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
671 DCHECK(keepAliveCount > 0);
672 return true;
673 }
674
675 void keepAliveRelease() override {
676 auto keepAliveCount =
677 keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
678 DCHECK(keepAliveCount > 0);
679 if (keepAliveCount == 1) {
680 delete this;
681 }
682 }
683
684 struct Queue {
685 std::vector<Func> funcs;
686 bool detached{false};
687 };
688
689 folly::Synchronized<Queue> queue_;
690 FutureBatonType baton_;
691
692 std::atomic<ssize_t> keepAliveCount_{1};
693};
694
695// Vector-like structure to play with window,
696// which otherwise expects a vector of size `times`,
697// which would be expensive with large `times` sizes.
698struct WindowFakeVector {
699 using iterator = std::vector<size_t>::iterator;
700
701 WindowFakeVector(size_t size) : size_(size) {}
702
703 size_t operator[](const size_t index) const {
704 return index;
705 }
706 size_t size() const {
707 return size_;
708 }
709
710 private:
711 size_t size_;
712};
713} // namespace detail
714} // namespace futures
715
716template <class T>
717SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
718 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
719}
720
721// makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
722template <class F>
723typename std::enable_if<
724 isFutureOrSemiFuture<invoke_result_t<F>>::value,
725 SemiFuture<typename invoke_result_t<F>::value_type>>::type
726makeSemiFutureWith(F&& func) {
727 using InnerType = typename isFutureOrSemiFuture<invoke_result_t<F>>::Inner;
728 try {
729 return std::forward<F>(func)();
730 } catch (std::exception& e) {
731 return makeSemiFuture<InnerType>(
732 exception_wrapper(std::current_exception(), e));
733 } catch (...) {
734 return makeSemiFuture<InnerType>(
735 exception_wrapper(std::current_exception()));
736 }
737}
738
739// makeSemiFutureWith(T()) -> SemiFuture<T>
740// makeSemiFutureWith(void()) -> SemiFuture<Unit>
741template <class F>
742typename std::enable_if<
743 !(isFutureOrSemiFuture<invoke_result_t<F>>::value),
744 SemiFuture<lift_unit_t<invoke_result_t<F>>>>::type
745makeSemiFutureWith(F&& func) {
746 using LiftedResult = lift_unit_t<invoke_result_t<F>>;
747 return makeSemiFuture<LiftedResult>(
748 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
749}
750
751template <class T>
752SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
753 return makeSemiFuture(Try<T>(e));
754}
755
756template <class T>
757SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
758 return makeSemiFuture(Try<T>(std::move(ew)));
759}
760
761template <class T, class E>
762typename std::
763 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
764 makeSemiFuture(E const& e) {
765 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
766}
767
768template <class T>
769SemiFuture<T> makeSemiFuture(Try<T> t) {
770 return SemiFuture<T>(SemiFuture<T>::Core::make(std::move(t)));
771}
772
773// This must be defined after the constructors to avoid a bug in MSVC
774// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
775inline SemiFuture<Unit> makeSemiFuture() {
776 return makeSemiFuture(Unit{});
777}
778
779template <class T>
780SemiFuture<T> SemiFuture<T>::makeEmpty() {
781 return SemiFuture<T>(futures::detail::EmptyConstruct{});
782}
783
784template <class T>
785typename SemiFuture<T>::DeferredExecutor* SemiFuture<T>::getDeferredExecutor()
786 const {
787 if (auto executor = this->getExecutor()) {
788 assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
789 return static_cast<DeferredExecutor*>(executor);
790 }
791 return nullptr;
792}
793
794template <class T>
795folly::Executor::KeepAlive<typename SemiFuture<T>::DeferredExecutor>
796SemiFuture<T>::stealDeferredExecutor() const {
797 if (auto executor = this->getExecutor()) {
798 assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
799 auto executorKeepAlive =
800 folly::getKeepAliveToken(static_cast<DeferredExecutor*>(executor));
801 this->core_->setExecutor(nullptr);
802 return executorKeepAlive;
803 }
804 return {};
805}
806
807template <class T>
808void SemiFuture<T>::releaseDeferredExecutor(Core* core) {
809 if (!core) {
810 return;
811 }
812 if (auto executor = core->getExecutor()) {
813 assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
814 static_cast<DeferredExecutor*>(executor)->detach();
815 core->setExecutor(nullptr);
816 }
817}
818
819template <class T>
820SemiFuture<T>::~SemiFuture() {
821 releaseDeferredExecutor(this->core_);
822}
823
824template <class T>
825SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
826 : futures::detail::FutureBase<T>(std::move(other)) {}
827
828template <class T>
829SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
830 : futures::detail::FutureBase<T>(std::move(other)) {
831 // SemiFuture should not have an executor on construction
832 if (this->core_) {
833 this->setExecutor(nullptr);
834 }
835}
836
837template <class T>
838SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
839 releaseDeferredExecutor(this->core_);
840 this->assign(std::move(other));
841 return *this;
842}
843
844template <class T>
845SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
846 releaseDeferredExecutor(this->core_);
847 this->assign(std::move(other));
848 // SemiFuture should not have an executor on construction
849 if (this->core_) {
850 this->setExecutor(nullptr);
851 }
852 return *this;
853}
854
855template <class T>
856Future<T> SemiFuture<T>::via(
857 Executor::KeepAlive<> executor,
858 int8_t priority) && {
859 if (!executor) {
860 throw_exception<FutureNoExecutor>();
861 }
862
863 if (auto deferredExecutor = getDeferredExecutor()) {
864 deferredExecutor->setExecutor(executor.copy());
865 }
866
867 auto newFuture = Future<T>(this->core_);
868 this->core_ = nullptr;
869 newFuture.setExecutor(std::move(executor), priority);
870
871 return newFuture;
872}
873
874template <class T>
875Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
876 return std::move(*this).via(getKeepAliveToken(executor), priority);
877}
878
879template <class T>
880Future<T> SemiFuture<T>::toUnsafeFuture() && {
881 return std::move(*this).via(&InlineExecutor::instance());
882}
883
884template <class T>
885template <typename F>
886SemiFuture<typename futures::detail::tryCallableResult<T, F>::value_type>
887SemiFuture<T>::defer(F&& func) && {
888 DeferredExecutor* deferredExecutor = getDeferredExecutor();
889 if (!deferredExecutor) {
890 auto newDeferredExecutor = DeferredExecutor::create();
891 deferredExecutor = newDeferredExecutor.get();
892 this->setExecutor(std::move(newDeferredExecutor));
893 }
894
895 auto sf = Future<T>(this->core_).thenTry(std::forward<F>(func)).semi();
896 this->core_ = nullptr;
897 // Carry deferred executor through chain as constructor from Future will
898 // nullify it
899 sf.setExecutor(deferredExecutor);
900 return sf;
901}
902
903template <class T>
904template <typename F>
905SemiFuture<
906 typename futures::detail::tryExecutorCallableResult<T, F>::value_type>
907SemiFuture<T>::defer(F&& func) && {
908 DeferredExecutor* deferredExecutor = getDeferredExecutor();
909 if (!deferredExecutor) {
910 auto newDeferredExecutor = DeferredExecutor::create();
911 deferredExecutor = newDeferredExecutor.get();
912 this->setExecutor(std::move(newDeferredExecutor));
913 }
914 return std::move(*this).defer(
915 [deferredExecutor, f = std::forward<F>(func)](Try<T>&& t) mutable {
916 return f(deferredExecutor->getExecutor(), std::move(t));
917 });
918}
919
920template <class T>
921template <typename F>
922SemiFuture<typename futures::detail::valueCallableResult<T, F>::value_type>
923SemiFuture<T>::deferValue(F&& func) && {
924 return std::move(*this).defer([f = std::forward<F>(func)](
925 folly::Try<T>&& t) mutable {
926 return std::forward<F>(f)(
927 t.template get<
928 false,
929 typename futures::detail::valueCallableResult<T, F>::FirstArg>());
930 });
931}
932
933template <class T>
934template <class ExceptionType, class F>
935SemiFuture<T> SemiFuture<T>::deferError(F&& func) && {
936 return std::move(*this).defer(
937 [func = std::forward<F>(func)](Try<T>&& t) mutable {
938 if (auto e = t.template tryGetExceptionObject<ExceptionType>()) {
939 return makeSemiFutureWith(
940 [&]() mutable { return std::forward<F>(func)(*e); });
941 } else {
942 return makeSemiFuture<T>(std::move(t));
943 }
944 });
945}
946
947template <class T>
948template <class F>
949SemiFuture<T> SemiFuture<T>::deferError(F&& func) && {
950 return std::move(*this).defer(
951 [func = std::forward<F>(func)](Try<T> t) mutable {
952 if (t.hasException()) {
953 return makeSemiFutureWith([&]() mutable {
954 return std::forward<F>(func)(std::move(t.exception()));
955 });
956 } else {
957 return makeSemiFuture<T>(std::move(t));
958 }
959 });
960}
961
962template <typename T>
963SemiFuture<T> SemiFuture<T>::delayed(Duration dur, Timekeeper* tk) && {
964 return collectAllSemiFuture(*this, futures::sleep(dur, tk))
965 .toUnsafeFuture()
966 .thenValue([](std::tuple<Try<T>, Try<Unit>> tup) {
967 Try<T>& t = std::get<0>(tup);
968 return makeFuture<T>(std::move(t));
969 });
970}
971
972template <class T>
973Future<T> Future<T>::makeEmpty() {
974 return Future<T>(futures::detail::EmptyConstruct{});
975}
976
977template <class T>
978Future<T>::Future(Future<T>&& other) noexcept
979 : futures::detail::FutureBase<T>(std::move(other)) {}
980
981template <class T>
982Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
983 this->assign(std::move(other));
984 return *this;
985}
986
987template <class T>
988template <
989 class T2,
990 typename std::enable_if<
991 !std::is_same<T, typename std::decay<T2>::type>::value &&
992 std::is_constructible<T, T2&&>::value &&
993 std::is_convertible<T2&&, T>::value,
994 int>::type>
995Future<T>::Future(Future<T2>&& other)
996 : Future(
997 std::move(other).thenValue([](T2&& v) { return T(std::move(v)); })) {}
998
999template <class T>
1000template <
1001 class T2,
1002 typename std::enable_if<
1003 !std::is_same<T, typename std::decay<T2>::type>::value &&
1004 std::is_constructible<T, T2&&>::value &&
1005 !std::is_convertible<T2&&, T>::value,
1006 int>::type>
1007Future<T>::Future(Future<T2>&& other)
1008 : Future(
1009 std::move(other).thenValue([](T2&& v) { return T(std::move(v)); })) {}
1010
1011template <class T>
1012template <
1013 class T2,
1014 typename std::enable_if<
1015 !std::is_same<T, typename std::decay<T2>::type>::value &&
1016 std::is_constructible<T, T2&&>::value,
1017 int>::type>
1018Future<T>& Future<T>::operator=(Future<T2>&& other) {
1019 return operator=(
1020 std::move(other).thenValue([](T2&& v) { return T(std::move(v)); }));
1021}
1022
1023// unwrap
1024
1025template <class T>
1026template <class F>
1027typename std::
1028 enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
1029 Future<T>::unwrap() && {
1030 return std::move(*this).thenValue(
1031 [](Future<typename isFuture<T>::Inner> internal_future) {
1032 return internal_future;
1033 });
1034}
1035
1036template <class T>
1037Future<T> Future<T>::via(Executor::KeepAlive<> executor, int8_t priority) && {
1038 this->setExecutor(std::move(executor), priority);
1039
1040 auto newFuture = Future<T>(this->core_);
1041 this->core_ = nullptr;
1042 return newFuture;
1043}
1044
1045template <class T>
1046Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
1047 return std::move(*this).via(getKeepAliveToken(executor), priority);
1048}
1049
1050template <class T>
1051Future<T> Future<T>::via(Executor::KeepAlive<> executor, int8_t priority) & {
1052 this->throwIfInvalid();
1053 Promise<T> p;
1054 auto sf = p.getSemiFuture();
1055 auto func = [p = std::move(p)](Try<T>&& t) mutable {
1056 p.setTry(std::move(t));
1057 };
1058 using R = futures::detail::callableResult<T, decltype(func)>;
1059 this->thenImplementation(std::move(func), R{});
1060 // Construct future from semifuture manually because this may not have
1061 // an executor set due to legacy code. This means we can bypass the executor
1062 // check in SemiFuture::via
1063 auto f = Future<T>(sf.core_);
1064 sf.core_ = nullptr;
1065 return std::move(f).via(std::move(executor), priority);
1066}
1067
1068template <class T>
1069Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
1070 return via(getKeepAliveToken(executor), priority);
1071}
1072
1073template <typename T>
1074template <typename R, typename Caller, typename... Args>
1075Future<typename isFuture<R>::Inner> Future<T>::then(
1076 R (Caller::*func)(Args...),
1077 Caller* instance) && {
1078 typedef typename std::remove_cv<typename std::remove_reference<
1079 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
1080 FirstArg;
1081
1082 return std::move(*this).thenTry([instance, func](Try<T>&& t) {
1083 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
1084 });
1085}
1086
1087template <class T>
1088template <typename F>
1089Future<typename futures::detail::tryCallableResult<T, F>::value_type>
1090Future<T>::thenTry(F&& func) && {
1091 auto lambdaFunc = [f = std::forward<F>(func)](folly::Try<T>&& t) mutable {
1092 return std::forward<F>(f)(std::move(t));
1093 };
1094 using R = futures::detail::tryCallableResult<T, decltype(lambdaFunc)>;
1095 return this->thenImplementation(std::move(lambdaFunc), R{});
1096}
1097
1098template <class T>
1099template <typename F>
1100Future<typename futures::detail::valueCallableResult<T, F>::value_type>
1101Future<T>::thenValue(F&& func) && {
1102 auto lambdaFunc = [f = std::forward<F>(func)](folly::Try<T>&& t) mutable {
1103 return std::forward<F>(f)(
1104 t.template get<
1105 false,
1106 typename futures::detail::valueCallableResult<T, F>::FirstArg>());
1107 };
1108 using R = futures::detail::tryCallableResult<T, decltype(lambdaFunc)>;
1109 return this->thenImplementation(std::move(lambdaFunc), R{});
1110}
1111
1112template <class T>
1113template <class ExceptionType, class F>
1114Future<T> Future<T>::thenError(F&& func) && {
1115 // Forward to onError but ensure that returned future carries the executor
1116 // Allow for applying to future with null executor while this is still
1117 // possible.
1118 auto* ePtr = this->getExecutor();
1119 auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance());
1120
1121 FOLLY_PUSH_WARNING
1122 FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
1123 return std::move(*this)
1124 .onError([func = std::forward<F>(func)](ExceptionType& ex) mutable {
1125 return std::forward<F>(func)(ex);
1126 })
1127 .via(std::move(e));
1128 FOLLY_POP_WARNING
1129}
1130
1131template <class T>
1132template <class F>
1133Future<T> Future<T>::thenError(F&& func) && {
1134 // Forward to onError but ensure that returned future carries the executor
1135 // Allow for applying to future with null executor while this is still
1136 // possible.
1137 auto* ePtr = this->getExecutor();
1138 auto e = folly::getKeepAliveToken(ePtr ? *ePtr : InlineExecutor::instance());
1139
1140 FOLLY_PUSH_WARNING
1141 FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
1142 return std::move(*this)
1143 .onError([func = std::forward<F>(func)](
1144 folly::exception_wrapper&& ex) mutable {
1145 return std::forward<F>(func)(std::move(ex));
1146 })
1147 .via(std::move(e));
1148 FOLLY_POP_WARNING
1149}
1150
1151template <class T>
1152Future<Unit> Future<T>::then() && {
1153 return std::move(*this).thenValue([](T&&) {});
1154}
1155
1156// onError where the callback returns T
1157template <class T>
1158template <class F>
1159typename std::enable_if<
1160 !is_invocable<F, exception_wrapper>::value &&
1161 !futures::detail::Extract<F>::ReturnsFuture::value,
1162 Future<T>>::type
1163Future<T>::onError(F&& func) && {
1164 typedef std::remove_reference_t<
1165 typename futures::detail::Extract<F>::FirstArg>
1166 Exn;
1167 static_assert(
1168 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
1169 "Return type of onError callback must be T or Future<T>");
1170
1171 Promise<T> p;
1172 p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler());
1173 auto sf = p.getSemiFuture();
1174
1175 this->setCallback_(
1176 [state = futures::detail::makeCoreCallbackState(
1177 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
1178 if (auto e = t.template tryGetExceptionObject<Exn>()) {
1179 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
1180 } else {
1181 state.setTry(std::move(t));
1182 }
1183 });
1184
1185 // Allow for applying to future with null executor while this is still
1186 // possible.
1187 // TODO(T26801487): Should have an executor
1188 return std::move(sf).via(&InlineExecutor::instance());
1189}
1190
1191// onError where the callback returns Future<T>
1192template <class T>
1193template <class F>
1194typename std::enable_if<
1195 !is_invocable<F, exception_wrapper>::value &&
1196 futures::detail::Extract<F>::ReturnsFuture::value,
1197 Future<T>>::type
1198Future<T>::onError(F&& func) && {
1199 static_assert(
1200 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
1201 value,
1202 "Return type of onError callback must be T or Future<T>");
1203 typedef std::remove_reference_t<
1204 typename futures::detail::Extract<F>::FirstArg>
1205 Exn;
1206
1207 Promise<T> p;
1208 auto sf = p.getSemiFuture();
1209
1210 this->setCallback_(
1211 [state = futures::detail::makeCoreCallbackState(
1212 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
1213 if (auto e = t.template tryGetExceptionObject<Exn>()) {
1214 auto tf2 = state.tryInvoke(*e);
1215 if (tf2.hasException()) {
1216 state.setException(std::move(tf2.exception()));
1217 } else {
1218 tf2->setCallback_([p = state.stealPromise()](Try<T>&& t3) mutable {
1219 p.setTry(std::move(t3));
1220 });
1221 }
1222 } else {
1223 state.setTry(std::move(t));
1224 }
1225 });
1226
1227 // Allow for applying to future with null executor while this is still
1228 // possible.
1229 // TODO(T26801487): Should have an executor
1230 return std::move(sf).via(&InlineExecutor::instance());
1231}
1232
1233template <class T>
1234template <class F>
1235Future<T> Future<T>::ensure(F&& func) && {
1236 return std::move(*this).thenTry(
1237 [funcw = std::forward<F>(func)](Try<T>&& t) mutable {
1238 std::forward<F>(funcw)();
1239 return makeFuture(std::move(t));
1240 });
1241}
1242
1243template <class T>
1244template <class F>
1245Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) && {
1246 return std::move(*this).within(dur, tk).template thenError<FutureTimeout>(
1247 [funcw = std::forward<F>(func)](auto const&) mutable {
1248 return std::forward<F>(funcw)();
1249 });
1250}
1251
1252template <class T>
1253template <class F>
1254typename std::enable_if<
1255 is_invocable<F, exception_wrapper>::value &&
1256 futures::detail::Extract<F>::ReturnsFuture::value,
1257 Future<T>>::type
1258Future<T>::onError(F&& func) && {
1259 static_assert(
1260 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
1261 value,
1262 "Return type of onError callback must be T or Future<T>");
1263
1264 Promise<T> p;
1265 auto sf = p.getSemiFuture();
1266 this->setCallback_(
1267 [state = futures::detail::makeCoreCallbackState(
1268 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
1269 if (t.hasException()) {
1270 auto tf2 = state.tryInvoke(std::move(t.exception()));
1271 if (tf2.hasException()) {
1272 state.setException(std::move(tf2.exception()));
1273 } else {
1274 tf2->setCallback_([p = state.stealPromise()](Try<T>&& t3) mutable {
1275 p.setTry(std::move(t3));
1276 });
1277 }
1278 } else {
1279 state.setTry(std::move(t));
1280 }
1281 });
1282
1283 // Allow for applying to future with null executor while this is still
1284 // possible.
1285 // TODO(T26801487): Should have an executor
1286 return std::move(sf).via(&InlineExecutor::instance());
1287}
1288
1289// onError(exception_wrapper) that returns T
1290template <class T>
1291template <class F>
1292typename std::enable_if<
1293 is_invocable<F, exception_wrapper>::value &&
1294 !futures::detail::Extract<F>::ReturnsFuture::value,
1295 Future<T>>::type
1296Future<T>::onError(F&& func) && {
1297 static_assert(
1298 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
1299 value,
1300 "Return type of onError callback must be T or Future<T>");
1301
1302 Promise<T> p;
1303 auto sf = p.getSemiFuture();
1304 this->setCallback_(
1305 [state = futures::detail::makeCoreCallbackState(
1306 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
1307 if (t.hasException()) {
1308 state.setTry(makeTryWith(
1309 [&] { return state.invoke(std::move(t.exception())); }));
1310 } else {
1311 state.setTry(std::move(t));
1312 }
1313 });
1314
1315 // Allow for applying to future with null executor while this is still
1316 // possible.
1317 // TODO(T26801487): Should have an executor
1318 return std::move(sf).via(&InlineExecutor::instance());
1319}
1320
1321template <class Func>
1322auto via(Executor* x, Func&& func) -> Future<
1323 typename isFutureOrSemiFuture<decltype(std::declval<Func>()())>::Inner> {
1324 // TODO make this actually more performant. :-P #7260175
1325 return via(x).thenValue([f = std::forward<Func>(func)](auto&&) mutable {
1326 return std::forward<Func>(f)();
1327 });
1328}
1329
1330template <class Func>
1331auto via(Executor::KeepAlive<> x, Func&& func) -> Future<
1332 typename isFutureOrSemiFuture<decltype(std::declval<Func>()())>::Inner> {
1333 return via(std::move(x))
1334 .thenValue([f = std::forward<Func>(func)](auto&&) mutable {
1335 return std::forward<Func>(f)();
1336 });
1337}
1338
1339// makeFuture
1340
1341template <class T>
1342Future<typename std::decay<T>::type> makeFuture(T&& t) {
1343 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
1344}
1345
1346inline Future<Unit> makeFuture() {
1347 return makeFuture(Unit{});
1348}
1349
1350// makeFutureWith(Future<T>()) -> Future<T>
1351template <class F>
1352typename std::
1353 enable_if<isFuture<invoke_result_t<F>>::value, invoke_result_t<F>>::type
1354 makeFutureWith(F&& func) {
1355 using InnerType = typename isFuture<invoke_result_t<F>>::Inner;
1356 try {
1357 return std::forward<F>(func)();
1358 } catch (std::exception& e) {
1359 return makeFuture<InnerType>(
1360 exception_wrapper(std::current_exception(), e));
1361 } catch (...) {
1362 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
1363 }
1364}
1365
1366// makeFutureWith(T()) -> Future<T>
1367// makeFutureWith(void()) -> Future<Unit>
1368template <class F>
1369typename std::enable_if<
1370 !(isFuture<invoke_result_t<F>>::value),
1371 Future<lift_unit_t<invoke_result_t<F>>>>::type
1372makeFutureWith(F&& func) {
1373 using LiftedResult = lift_unit_t<invoke_result_t<F>>;
1374 return makeFuture<LiftedResult>(
1375 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
1376}
1377
1378template <class T>
1379Future<T> makeFuture(std::exception_ptr const& e) {
1380 return makeFuture(Try<T>(e));
1381}
1382
1383template <class T>
1384Future<T> makeFuture(exception_wrapper ew) {
1385 return makeFuture(Try<T>(std::move(ew)));
1386}
1387
1388template <class T, class E>
1389typename std::enable_if<std::is_base_of<std::exception, E>::value, Future<T>>::
1390 type
1391 makeFuture(E const& e) {
1392 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
1393}
1394
1395template <class T>
1396Future<T> makeFuture(Try<T> t) {
1397 return Future<T>(Future<T>::Core::make(std::move(t)));
1398}
1399
1400// via
1401Future<Unit> via(Executor* executor, int8_t priority) {
1402 return makeFuture().via(executor, priority);
1403}
1404
1405Future<Unit> via(Executor::KeepAlive<> executor, int8_t priority) {
1406 return makeFuture().via(std::move(executor), priority);
1407}
1408
1409namespace futures {
1410namespace detail {
1411
1412template <typename V, typename... Fs, std::size_t... Is>
1413FOLLY_ALWAYS_INLINE FOLLY_ATTR_VISIBILITY_HIDDEN void
1414foreach_(index_sequence<Is...>, V&& v, Fs&&... fs) {
1415 using _ = int[];
1416 void(_{0, (void(v(index_constant<Is>{}, static_cast<Fs&&>(fs))), 0)...});
1417}
1418template <typename V, typename... Fs>
1419FOLLY_ALWAYS_INLINE FOLLY_ATTR_VISIBILITY_HIDDEN void foreach(
1420 V&& v,
1421 Fs&&... fs) {
1422 using _ = index_sequence_for<Fs...>;
1423 foreach_(_{}, static_cast<V&&>(v), static_cast<Fs&&>(fs)...);
1424}
1425
1426template <typename T>
1427DeferredExecutor* getDeferredExecutor(SemiFuture<T>& future) {
1428 return future.getDeferredExecutor();
1429}
1430
1431template <typename T>
1432folly::Executor::KeepAlive<DeferredExecutor> stealDeferredExecutor(
1433 SemiFuture<T>& future) {
1434 return future.stealDeferredExecutor();
1435}
1436
1437template <typename T>
1438folly::Executor::KeepAlive<DeferredExecutor> stealDeferredExecutor(Future<T>&) {
1439 return {};
1440}
1441
1442template <typename... Ts>
1443void stealDeferredExecutorsVariadic(
1444 std::vector<folly::Executor::KeepAlive<DeferredExecutor>>& executors,
1445 Ts&... ts) {
1446 auto foreach = [&](auto& future) {
1447 if (auto executor = stealDeferredExecutor(future)) {
1448 executors.push_back(std::move(executor));
1449 }
1450 return folly::unit;
1451 };
1452 [](...) {}(foreach(ts)...);
1453}
1454
1455template <class InputIterator>
1456void stealDeferredExecutors(
1457 std::vector<folly::Executor::KeepAlive<DeferredExecutor>>& executors,
1458 InputIterator first,
1459 InputIterator last) {
1460 for (auto it = first; it != last; ++it) {
1461 if (auto executor = stealDeferredExecutor(*it)) {
1462 executors.push_back(std::move(executor));
1463 }
1464 }
1465}
1466} // namespace detail
1467} // namespace futures
1468
1469// collectAll (variadic)
1470
1471template <typename... Fs>
1472SemiFuture<std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...>>
1473collectAllSemiFuture(Fs&&... fs) {
1474 using Result = std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...>;
1475 struct Context {
1476 ~Context() {
1477 p.setValue(std::move(results));
1478 }
1479 Promise<Result> p;
1480 Result results;
1481 };
1482
1483 std::vector<folly::Executor::KeepAlive<futures::detail::DeferredExecutor>>
1484 executors;
1485 futures::detail::stealDeferredExecutorsVariadic(executors, fs...);
1486
1487 auto ctx = std::make_shared<Context>();
1488 futures::detail::foreach(
1489 [&](auto i, auto&& f) {
1490 f.setCallback_([i, ctx](auto&& t) {
1491 std::get<i.value>(ctx->results) = std::move(t);
1492 });
1493 },
1494 static_cast<Fs&&>(fs)...);
1495
1496 auto future = ctx->p.getSemiFuture();
1497 if (!executors.empty()) {
1498 auto work = [](Try<typename decltype(future)::value_type>&& t) {
1499 return std::move(t).value();
1500 };
1501 future = std::move(future).defer(work);
1502 auto deferredExecutor = futures::detail::getDeferredExecutor(future);
1503 deferredExecutor->setNestedExecutors(std::move(executors));
1504 }
1505 return future;
1506}
1507
1508template <typename... Fs>
1509Future<std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...>> collectAll(
1510 Fs&&... fs) {
1511 return collectAllSemiFuture(std::forward<Fs>(fs)...).toUnsafeFuture();
1512}
1513
1514// collectAll (iterator)
1515
1516template <class InputIterator>
1517SemiFuture<std::vector<
1518 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
1519collectAllSemiFuture(InputIterator first, InputIterator last) {
1520 using F = typename std::iterator_traits<InputIterator>::value_type;
1521 using T = typename F::value_type;
1522
1523 struct Context {
1524 explicit Context(size_t n) : results(n) {}
1525 ~Context() {
1526 p.setValue(std::move(results));
1527 }
1528 Promise<std::vector<Try<T>>> p;
1529 std::vector<Try<T>> results;
1530 };
1531
1532 std::vector<folly::Executor::KeepAlive<futures::detail::DeferredExecutor>>
1533 executors;
1534 futures::detail::stealDeferredExecutors(executors, first, last);
1535
1536 auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
1537
1538 for (size_t i = 0; first != last; ++first, ++i) {
1539 first->setCallback_(
1540 [i, ctx](Try<T>&& t) { ctx->results[i] = std::move(t); });
1541 }
1542
1543 auto future = ctx->p.getSemiFuture();
1544 if (!executors.empty()) {
1545 future = std::move(future).defer(
1546 [](Try<typename decltype(future)::value_type>&& t) {
1547 return std::move(t).value();
1548 });
1549 auto deferredExecutor = futures::detail::getDeferredExecutor(future);
1550 deferredExecutor->setNestedExecutors(std::move(executors));
1551 }
1552 return future;
1553}
1554
1555template <class InputIterator>
1556Future<std::vector<
1557 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
1558collectAll(InputIterator first, InputIterator last) {
1559 return collectAllSemiFuture(first, last).toUnsafeFuture();
1560}
1561
1562// collect (iterator)
1563
1564// TODO(T26439406): Make return SemiFuture
1565template <class InputIterator>
1566Future<std::vector<
1567 typename std::iterator_traits<InputIterator>::value_type::value_type>>
1568collect(InputIterator first, InputIterator last) {
1569 using F = typename std::iterator_traits<InputIterator>::value_type;
1570 using T = typename F::value_type;
1571
1572 struct Context {
1573 explicit Context(size_t n) : result(n) {
1574 finalResult.reserve(n);
1575 }
1576 ~Context() {
1577 if (!threw.load(std::memory_order_relaxed)) {
1578 // map Optional<T> -> T
1579 std::transform(
1580 result.begin(),
1581 result.end(),
1582 std::back_inserter(finalResult),
1583 [](Optional<T>& o) { return std::move(o.value()); });
1584 p.setValue(std::move(finalResult));
1585 }
1586 }
1587 Promise<std::vector<T>> p;
1588 std::vector<Optional<T>> result;
1589 std::vector<T> finalResult;
1590 std::atomic<bool> threw{false};
1591 };
1592
1593 auto ctx = std::make_shared<Context>(std::distance(first, last));
1594 for (size_t i = 0; first != last; ++first, ++i) {
1595 first->setCallback_([i, ctx](Try<T>&& t) {
1596 if (t.hasException()) {
1597 if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
1598 ctx->p.setException(std::move(t.exception()));
1599 }
1600 } else if (!ctx->threw.load(std::memory_order_relaxed)) {
1601 ctx->result[i] = std::move(t.value());
1602 }
1603 });
1604 }
1605 return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
1606}
1607
1608// collect (variadic)
1609
1610// TODO(T26439406): Make return SemiFuture
1611template <typename... Fs>
1612Future<std::tuple<typename remove_cvref_t<Fs>::value_type...>> collect(
1613 Fs&&... fs) {
1614 using Result = std::tuple<typename remove_cvref_t<Fs>::value_type...>;
1615 struct Context {
1616 ~Context() {
1617 if (!threw.load(std::memory_order_relaxed)) {
1618 p.setValue(unwrapTryTuple(std::move(results)));
1619 }
1620 }
1621 Promise<Result> p;
1622 std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...> results;
1623 std::atomic<bool> threw{false};
1624 };
1625
1626 auto ctx = std::make_shared<Context>();
1627 futures::detail::foreach(
1628 [&](auto i, auto&& f) {
1629 f.setCallback_([i, ctx](auto&& t) {
1630 if (t.hasException()) {
1631 if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
1632 ctx->p.setException(std::move(t.exception()));
1633 }
1634 } else if (!ctx->threw.load(std::memory_order_relaxed)) {
1635 std::get<i.value>(ctx->results) = std::move(t);
1636 }
1637 });
1638 },
1639 static_cast<Fs&&>(fs)...);
1640 return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
1641}
1642
1643// collectAny (iterator)
1644
1645// TODO(T26439406): Make return SemiFuture
1646template <class InputIterator>
1647Future<std::pair<
1648 size_t,
1649 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
1650collectAny(InputIterator first, InputIterator last) {
1651 using F = typename std::iterator_traits<InputIterator>::value_type;
1652 using T = typename F::value_type;
1653
1654 struct Context {
1655 Promise<std::pair<size_t, Try<T>>> p;
1656 std::atomic<bool> done{false};
1657 };
1658
1659 auto ctx = std::make_shared<Context>();
1660 for (size_t i = 0; first != last; ++first, ++i) {
1661 first->setCallback_([i, ctx](Try<T>&& t) {
1662 if (!ctx->done.exchange(true, std::memory_order_relaxed)) {
1663 ctx->p.setValue(std::make_pair(i, std::move(t)));
1664 }
1665 });
1666 }
1667 return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
1668}
1669
1670// collectAnyWithoutException (iterator)
1671
1672template <class InputIterator>
1673SemiFuture<std::pair<
1674 size_t,
1675 typename std::iterator_traits<InputIterator>::value_type::value_type>>
1676collectAnyWithoutException(InputIterator first, InputIterator last) {
1677 using F = typename std::iterator_traits<InputIterator>::value_type;
1678 using T = typename F::value_type;
1679
1680 struct Context {
1681 Context(size_t n) : nTotal(n) {}
1682 Promise<std::pair<size_t, T>> p;
1683 std::atomic<bool> done{false};
1684 std::atomic<size_t> nFulfilled{0};
1685 size_t nTotal;
1686 };
1687
1688 std::vector<folly::Executor::KeepAlive<futures::detail::DeferredExecutor>>
1689 executors;
1690 futures::detail::stealDeferredExecutors(executors, first, last);
1691
1692 auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
1693 for (size_t i = 0; first != last; ++first, ++i) {
1694 first->setCallback_([i, ctx](Try<T>&& t) {
1695 if (!t.hasException() &&
1696 !ctx->done.exchange(true, std::memory_order_relaxed)) {
1697 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
1698 } else if (
1699 ctx->nFulfilled.fetch_add(1, std::memory_order_relaxed) + 1 ==
1700 ctx->nTotal) {
1701 ctx->p.setException(t.exception());
1702 }
1703 });
1704 }
1705
1706 auto future = ctx->p.getSemiFuture();
1707 if (!executors.empty()) {
1708 future = std::move(future).defer(
1709 [](Try<typename decltype(future)::value_type>&& t) {
1710 return std::move(t).value();
1711 });
1712 auto deferredExecutor = futures::detail::getDeferredExecutor(future);
1713 deferredExecutor->setNestedExecutors(std::move(executors));
1714 }
1715 return future;
1716}
1717
1718// collectN (iterator)
1719
1720template <class InputIterator>
1721SemiFuture<std::vector<std::pair<
1722 size_t,
1723 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
1724collectN(InputIterator first, InputIterator last, size_t n) {
1725 using F = typename std::iterator_traits<InputIterator>::value_type;
1726 using T = typename F::value_type;
1727 using Result = std::vector<std::pair<size_t, Try<T>>>;
1728
1729 struct Context {
1730 explicit Context(size_t numFutures, size_t min_)
1731 : v(numFutures), min(min_) {}
1732
1733 std::vector<Optional<Try<T>>> v;
1734 size_t min;
1735 std::atomic<size_t> completed = {0}; // # input futures completed
1736 std::atomic<size_t> stored = {0}; // # output values stored
1737 Promise<Result> p;
1738 };
1739
1740 assert(n > 0);
1741 assert(std::distance(first, last) >= 0);
1742
1743 if (size_t(std::distance(first, last)) < n) {
1744 return SemiFuture<Result>(
1745 exception_wrapper(std::runtime_error("Not enough futures")));
1746 }
1747
1748 // for each completed Future, increase count and add to vector, until we
1749 // have n completed futures at which point we fulfil our Promise with the
1750 // vector
1751 auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)), n);
1752 for (size_t i = 0; first != last; ++first, ++i) {
1753 first->setCallback_([i, ctx](Try<T>&& t) {
1754 // relaxed because this guards control but does not guard data
1755 auto const c = 1 + ctx->completed.fetch_add(1, std::memory_order_relaxed);
1756 if (c > ctx->min) {
1757 return;
1758 }
1759 ctx->v[i] = std::move(t);
1760
1761 // release because the stored values in all threads must be visible below
1762 // acquire because no stored value is permitted to be fetched early
1763 auto const s = 1 + ctx->stored.fetch_add(1, std::memory_order_acq_rel);
1764 if (s < ctx->min) {
1765 return;
1766 }
1767 Result result;
1768 result.reserve(ctx->completed.load());
1769 for (size_t j = 0; j < ctx->v.size(); ++j) {
1770 auto& entry = ctx->v[j];
1771 if (entry.hasValue()) {
1772 result.emplace_back(j, std::move(entry).value());
1773 }
1774 }
1775 ctx->p.setTry(Try<Result>(std::move(result)));
1776 });
1777 }
1778
1779 return ctx->p.getSemiFuture();
1780}
1781
1782// reduce (iterator)
1783
1784template <class It, class T, class F>
1785Future<T> reduce(It first, It last, T&& initial, F&& func) {
1786 if (first == last) {
1787 return makeFuture(std::forward<T>(initial));
1788 }
1789
1790 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1791 typedef typename std::
1792 conditional<is_invocable<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type
1793 Arg;
1794 typedef isTry<Arg> IsTry;
1795
1796 auto sfunc = std::make_shared<std::decay_t<F>>(std::forward<F>(func));
1797
1798 auto f = std::move(*first).thenTry(
1799 [initial = std::forward<T>(initial), sfunc](Try<ItT>&& head) mutable {
1800 return (*sfunc)(
1801 std::move(initial), head.template get<IsTry::value, Arg&&>());
1802 });
1803
1804 for (++first; first != last; ++first) {
1805 f = collectAllSemiFuture(f, *first).toUnsafeFuture().thenValue(
1806 [sfunc](std::tuple<Try<T>, Try<ItT>>&& t) {
1807 return (*sfunc)(
1808 std::move(std::get<0>(t).value()),
1809 // Either return a ItT&& or a Try<ItT>&& depending
1810 // on the type of the argument of func.
1811 std::get<1>(t).template get<IsTry::value, Arg&&>());
1812 });
1813 }
1814
1815 return f;
1816}
1817
1818// window (collection)
1819
1820template <class Collection, class F, class ItT, class Result>
1821std::vector<Future<Result>> window(Collection input, F func, size_t n) {
1822 // Use global QueuedImmediateExecutor singleton to avoid stack overflow.
1823 auto executor = &QueuedImmediateExecutor::instance();
1824 return window(executor, std::move(input), std::move(func), n);
1825}
1826
1827template <class F>
1828auto window(size_t times, F func, size_t n)
1829 -> std::vector<invoke_result_t<F, size_t>> {
1830 return window(futures::detail::WindowFakeVector(times), std::move(func), n);
1831}
1832
1833template <class Collection, class F, class ItT, class Result>
1834std::vector<Future<Result>>
1835window(Executor* executor, Collection input, F func, size_t n) {
1836 return window(
1837 getKeepAliveToken(executor), std::move(input), std::move(func), n);
1838}
1839
1840template <class Collection, class F, class ItT, class Result>
1841std::vector<Future<Result>>
1842window(Executor::KeepAlive<> executor, Collection input, F func, size_t n) {
1843 struct WindowContext {
1844 WindowContext(
1845 Executor::KeepAlive<> executor_,
1846 Collection&& input_,
1847 F&& func_)
1848 : executor(std::move(executor_)),
1849 input(std::move(input_)),
1850 promises(input.size()),
1851 func(std::move(func_)) {}
1852 std::atomic<size_t> i{0};
1853 Executor::KeepAlive<> executor;
1854 Collection input;
1855 std::vector<Promise<Result>> promises;
1856 F func;
1857
1858 static void spawn(std::shared_ptr<WindowContext> ctx) {
1859 size_t i = ctx->i.fetch_add(1, std::memory_order_relaxed);
1860 if (i < ctx->input.size()) {
1861 auto fut = makeSemiFutureWith(
1862 [&] { return ctx->func(std::move(ctx->input[i])); })
1863 .via(ctx->executor.get());
1864
1865 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1866 ctx->promises[i].setTry(std::move(t));
1867 // Chain another future onto this one
1868 spawn(std::move(ctx));
1869 });
1870 }
1871 }
1872 };
1873
1874 auto max = std::min(n, input.size());
1875
1876 auto ctx = std::make_shared<WindowContext>(
1877 executor.copy(), std::move(input), std::move(func));
1878
1879 // Start the first n Futures
1880 for (size_t i = 0; i < max; ++i) {
1881 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1882 }
1883
1884 std::vector<Future<Result>> futures;
1885 futures.reserve(ctx->promises.size());
1886 for (auto& promise : ctx->promises) {
1887 futures.emplace_back(promise.getSemiFuture().via(executor.copy()));
1888 }
1889
1890 return futures;
1891}
1892
1893// reduce
1894
1895template <class T>
1896template <class I, class F>
1897Future<I> Future<T>::reduce(I&& initial, F&& func) && {
1898 return std::move(*this).thenValue(
1899 [minitial = std::forward<I>(initial),
1900 mfunc = std::forward<F>(func)](T&& vals) mutable {
1901 auto ret = std::move(minitial);
1902 for (auto& val : vals) {
1903 ret = mfunc(std::move(ret), std::move(val));
1904 }
1905 return ret;
1906 });
1907}
1908
1909// unorderedReduce (iterator)
1910
1911// TODO(T26439406): Make return SemiFuture
1912template <class It, class T, class F>
1913Future<T> unorderedReduce(It first, It last, T initial, F func) {
1914 using ItF = typename std::iterator_traits<It>::value_type;
1915 using ItT = typename ItF::value_type;
1916 using Arg = MaybeTryArg<F, T, ItT>;
1917
1918 if (first == last) {
1919 return makeFuture(std::move(initial));
1920 }
1921
1922 typedef isTry<Arg> IsTry;
1923
1924 struct Context {
1925 Context(T&& memo, F&& fn, size_t n)
1926 : lock_(),
1927 memo_(makeFuture<T>(std::move(memo))),
1928 func_(std::move(fn)),
1929 numThens_(0),
1930 numFutures_(n),
1931 promise_() {}
1932
1933 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1934 Future<T> memo_;
1935 F func_;
1936 size_t numThens_; // how many Futures completed and called .then()
1937 size_t numFutures_; // how many Futures in total
1938 Promise<T> promise_;
1939 };
1940
1941 struct Fulfill {
1942 void operator()(Promise<T>&& p, T&& v) const {
1943 p.setValue(std::move(v));
1944 }
1945 void operator()(Promise<T>&& p, Future<T>&& f) const {
1946 f.setCallback_(
1947 [p = std::move(p)](Try<T>&& t) mutable { p.setTry(std::move(t)); });
1948 }
1949 };
1950
1951 auto ctx = std::make_shared<Context>(
1952 std::move(initial), std::move(func), std::distance(first, last));
1953 for (size_t i = 0; first != last; ++first, ++i) {
1954 first->setCallback_([i, ctx](Try<ItT>&& t) {
1955 (void)i;
1956 // Futures can be completed in any order, simultaneously.
1957 // To make this non-blocking, we create a new Future chain in
1958 // the order of completion to reduce the values.
1959 // The spinlock just protects chaining a new Future, not actually
1960 // executing the reduce, which should be really fast.
1961 Promise<T> p;
1962 auto f = p.getFuture();
1963 {
1964 folly::MSLGuard lock(ctx->lock_);
1965 f = exchange(ctx->memo_, std::move(f));
1966 if (++ctx->numThens_ == ctx->numFutures_) {
1967 // After reducing the value of the last Future, fulfill the Promise
1968 ctx->memo_.setCallback_(
1969 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1970 }
1971 }
1972 f.setCallback_(
1973 [ctx, mp = std::move(p), mt = std::move(t)](Try<T>&& v) mutable {
1974 if (v.hasValue()) {
1975 try {
1976 Fulfill{}(
1977 std::move(mp),
1978 ctx->func_(
1979 std::move(v.value()),
1980 mt.template get<IsTry::value, Arg&&>()));
1981 } catch (std::exception& e) {
1982 mp.setException(exception_wrapper(std::current_exception(), e));
1983 } catch (...) {
1984 mp.setException(exception_wrapper(std::current_exception()));
1985 }
1986 } else {
1987 mp.setTry(std::move(v));
1988 }
1989 });
1990 });
1991 }
1992 return ctx->promise_.getSemiFuture().via(&InlineExecutor::instance());
1993}
1994
1995// within
1996
1997template <class T>
1998Future<T> Future<T>::within(Duration dur, Timekeeper* tk) && {
1999 return std::move(*this).within(dur, FutureTimeout(), tk);
2000}
2001
2002template <class T>
2003template <class E>
2004Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) && {
2005 if (this->isReady()) {
2006 return std::move(*this);
2007 }
2008
2009 auto* exe = this->getExecutor();
2010 return std::move(*this)
2011 .withinImplementation(dur, e, tk)
2012 .via(exe ? exe : &InlineExecutor::instance());
2013}
2014
2015// delayed
2016
2017template <class T>
2018Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) && {
2019 auto e = this->getExecutor();
2020 return collectAllSemiFuture(*this, futures::sleep(dur, tk))
2021 .via(e ? e : &InlineExecutor::instance())
2022 .thenValue([](std::tuple<Try<T>, Try<Unit>>&& tup) {
2023 return makeFuture<T>(std::get<0>(std::move(tup)));
2024 });
2025}
2026
2027template <class T>
2028Future<T> Future<T>::delayedUnsafe(Duration dur, Timekeeper* tk) {
2029 return std::move(*this).semi().delayed(dur, tk).toUnsafeFuture();
2030}
2031
2032namespace futures {
2033namespace detail {
2034
2035template <class FutureType, typename T = typename FutureType::value_type>
2036void waitImpl(FutureType& f) {
2037 if (std::is_base_of<Future<T>, FutureType>::value) {
2038 f = std::move(f).via(&InlineExecutor::instance());
2039 }
2040 // short-circuit if there's nothing to do
2041 if (f.isReady()) {
2042 return;
2043 }
2044
2045 Promise<T> promise;
2046 auto ret = promise.getSemiFuture();
2047 auto baton = std::make_shared<FutureBatonType>();
2048 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
2049 promise.setTry(std::move(t));
2050 baton->post();
2051 });
2052 convertFuture(std::move(ret), f);
2053 baton->wait();
2054 assert(f.isReady());
2055}
2056
2057template <class T>
2058void convertFuture(SemiFuture<T>&& sf, Future<T>& f) {
2059 // Carry executor from f, inserting an inline executor if it did not have one
2060 auto* exe = f.getExecutor();
2061 f = std::move(sf).via(exe ? exe : &InlineExecutor::instance());
2062}
2063
2064template <class T>
2065void convertFuture(SemiFuture<T>&& sf, SemiFuture<T>& f) {
2066 f = std::move(sf);
2067}
2068
2069template <class FutureType, typename T = typename FutureType::value_type>
2070void waitImpl(FutureType& f, Duration dur) {
2071 if (std::is_base_of<Future<T>, FutureType>::value) {
2072 f = std::move(f).via(&InlineExecutor::instance());
2073 }
2074 // short-circuit if there's nothing to do
2075 if (f.isReady()) {
2076 return;
2077 }
2078
2079 Promise<T> promise;
2080 auto ret = promise.getSemiFuture();
2081 auto baton = std::make_shared<FutureBatonType>();
2082 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
2083 promise.setTry(std::move(t));
2084 baton->post();
2085 });
2086 convertFuture(std::move(ret), f);
2087 if (baton->try_wait_for(dur)) {
2088 assert(f.isReady());
2089 }
2090}
2091
2092template <class T>
2093void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
2094 // Set callback so to ensure that the via executor has something on it
2095 // so that once the preceding future triggers this callback, drive will
2096 // always have a callback to satisfy it
2097 if (f.isReady()) {
2098 return;
2099 }
2100 f = std::move(f).via(e).thenValue([](T&& t) { return std::move(t); });
2101 while (!f.isReady()) {
2102 e->drive();
2103 }
2104 assert(f.isReady());
2105 f = std::move(f).via(&InlineExecutor::instance());
2106}
2107
2108template <class T, typename Rep, typename Period>
2109void waitViaImpl(
2110 Future<T>& f,
2111 TimedDrivableExecutor* e,
2112 const std::chrono::duration<Rep, Period>& timeout) {
2113 // Set callback so to ensure that the via executor has something on it
2114 // so that once the preceding future triggers this callback, drive will
2115 // always have a callback to satisfy it
2116 if (f.isReady()) {
2117 return;
2118 }
2119 // Chain operations, ensuring that the executor is kept alive for the duration
2120 f = std::move(f).via(e).thenValue(
2121 [keepAlive = getKeepAliveToken(e)](T&& t) { return std::move(t); });
2122 auto now = std::chrono::steady_clock::now();
2123 auto deadline = now + timeout;
2124 while (!f.isReady() && (now < deadline)) {
2125 e->try_drive_until(deadline);
2126 now = std::chrono::steady_clock::now();
2127 }
2128 assert(f.isReady() || (now >= deadline));
2129 if (f.isReady()) {
2130 f = std::move(f).via(&InlineExecutor::instance());
2131 }
2132}
2133
2134} // namespace detail
2135} // namespace futures
2136
2137template <class T>
2138SemiFuture<T>& SemiFuture<T>::wait() & {
2139 if (auto deferredExecutor = getDeferredExecutor()) {
2140 // Make sure that the last callback in the future chain will be run on the
2141 // WaitExecutor.
2142 Promise<T> promise;
2143 auto ret = promise.getSemiFuture();
2144 setCallback_(
2145 [p = std::move(promise)](auto&& r) mutable { p.setTry(std::move(r)); });
2146 auto waitExecutor = futures::detail::WaitExecutor::create();
2147 deferredExecutor->setExecutor(waitExecutor.copy());
2148 while (!ret.isReady()) {
2149 waitExecutor->drive();
2150 }
2151 waitExecutor->detach();
2152 this->detach();
2153 *this = std::move(ret);
2154 } else {
2155 futures::detail::waitImpl(*this);
2156 }
2157 return *this;
2158}
2159
2160template <class T>
2161SemiFuture<T>&& SemiFuture<T>::wait() && {
2162 return std::move(wait());
2163}
2164
2165template <class T>
2166SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
2167 if (auto deferredExecutor = getDeferredExecutor()) {
2168 // Make sure that the last callback in the future chain will be run on the
2169 // WaitExecutor.
2170 Promise<T> promise;
2171 auto ret = promise.getSemiFuture();
2172 setCallback_(
2173 [p = std::move(promise)](auto&& r) mutable { p.setTry(std::move(r)); });
2174 auto waitExecutor = futures::detail::WaitExecutor::create();
2175 auto deadline = futures::detail::WaitExecutor::Clock::now() + dur;
2176 deferredExecutor->setExecutor(waitExecutor.copy());
2177 while (!ret.isReady()) {
2178 if (!waitExecutor->driveUntil(deadline)) {
2179 break;
2180 }
2181 }
2182 waitExecutor->detach();
2183 this->detach();
2184 *this = std::move(ret);
2185 } else {
2186 futures::detail::waitImpl(*this, dur);
2187 }
2188 return *this;
2189}
2190
2191template <class T>
2192bool SemiFuture<T>::wait(Duration dur) && {
2193 auto future = std::move(*this);
2194 future.wait(dur);
2195 return future.isReady();
2196}
2197
2198template <class T>
2199T SemiFuture<T>::get() && {
2200 return std::move(*this).getTry().value();
2201}
2202
2203template <class T>
2204T SemiFuture<T>::get(Duration dur) && {
2205 return std::move(*this).getTry(dur).value();
2206}
2207
2208template <class T>
2209Try<T> SemiFuture<T>::getTry() && {
2210 wait();
2211 auto future = folly::Future<T>(this->core_);
2212 this->core_ = nullptr;
2213 return std::move(std::move(future).getTry());
2214}
2215
2216template <class T>
2217Try<T> SemiFuture<T>::getTry(Duration dur) && {
2218 wait(dur);
2219 auto future = folly::Future<T>(this->core_);
2220 this->core_ = nullptr;
2221
2222 if (!future.isReady()) {
2223 throw_exception<FutureTimeout>();
2224 }
2225 return std::move(std::move(future).getTry());
2226}
2227
2228template <class T>
2229Future<T>& Future<T>::wait() & {
2230 futures::detail::waitImpl(*this);
2231 return *this;
2232}
2233
2234template <class T>
2235Future<T>&& Future<T>::wait() && {
2236 futures::detail::waitImpl(*this);
2237 return std::move(*this);
2238}
2239
2240template <class T>
2241Future<T>& Future<T>::wait(Duration dur) & {
2242 futures::detail::waitImpl(*this, dur);
2243 return *this;
2244}
2245
2246template <class T>
2247Future<T>&& Future<T>::wait(Duration dur) && {
2248 futures::detail::waitImpl(*this, dur);
2249 return std::move(*this);
2250}
2251
2252template <class T>
2253Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
2254 futures::detail::waitViaImpl(*this, e);
2255 return *this;
2256}
2257
2258template <class T>
2259Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
2260 futures::detail::waitViaImpl(*this, e);
2261 return std::move(*this);
2262}
2263
2264template <class T>
2265Future<T>& Future<T>::waitVia(TimedDrivableExecutor* e, Duration dur) & {
2266 futures::detail::waitViaImpl(*this, e, dur);
2267 return *this;
2268}
2269
2270template <class T>
2271Future<T>&& Future<T>::waitVia(TimedDrivableExecutor* e, Duration dur) && {
2272 futures::detail::waitViaImpl(*this, e, dur);
2273 return std::move(*this);
2274}
2275
2276template <class T>
2277T Future<T>::get() && {
2278 wait();
2279 return copy(std::move(*this)).value();
2280}
2281
2282template <class T>
2283T Future<T>::get(Duration dur) && {
2284 wait(dur);
2285 auto future = copy(std::move(*this));
2286 if (!future.isReady()) {
2287 throw_exception<FutureTimeout>();
2288 }
2289 return std::move(future).value();
2290}
2291
2292template <class T>
2293Try<T>& Future<T>::getTry() {
2294 return result();
2295}
2296
2297template <class T>
2298T Future<T>::getVia(DrivableExecutor* e) {
2299 return std::move(waitVia(e).value());
2300}
2301
2302template <class T>
2303T Future<T>::getVia(TimedDrivableExecutor* e, Duration dur) {
2304 waitVia(e, dur);
2305 if (!this->isReady()) {
2306 throw_exception<FutureTimeout>();
2307 }
2308 return std::move(value());
2309}
2310
2311template <class T>
2312Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
2313 return waitVia(e).getTry();
2314}
2315
2316template <class T>
2317Try<T>& Future<T>::getTryVia(TimedDrivableExecutor* e, Duration dur) {
2318 waitVia(e, dur);
2319 if (!this->isReady()) {
2320 throw_exception<FutureTimeout>();
2321 }
2322 return result();
2323}
2324
2325namespace futures {
2326namespace detail {
2327template <class T>
2328struct TryEquals {
2329 static bool equals(const Try<T>& t1, const Try<T>& t2) {
2330 return t1.value() == t2.value();
2331 }
2332};
2333} // namespace detail
2334} // namespace futures
2335
2336template <class T>
2337Future<bool> Future<T>::willEqual(Future<T>& f) {
2338 return collectAllSemiFuture(*this, f).toUnsafeFuture().thenValue(
2339 [](const std::tuple<Try<T>, Try<T>>& t) {
2340 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
2341 return futures::detail::TryEquals<T>::equals(
2342 std::get<0>(t), std::get<1>(t));
2343 } else {
2344 return false;
2345 }
2346 });
2347}
2348
2349template <class T>
2350template <class F>
2351Future<T> Future<T>::filter(F&& predicate) && {
2352 return std::move(*this).thenValue([p = std::forward<F>(predicate)](T val) {
2353 T const& valConstRef = val;
2354 if (!p(valConstRef)) {
2355 throw_exception<FuturePredicateDoesNotObtain>();
2356 }
2357 return val;
2358 });
2359}
2360
2361template <class F>
2362Future<Unit> when(bool p, F&& thunk) {
2363 return p ? std::forward<F>(thunk)().unit() : makeFuture();
2364}
2365
2366template <class P, class F>
2367Future<Unit> whileDo(P&& predicate, F&& thunk) {
2368 if (predicate()) {
2369 auto future = thunk();
2370 return std::move(future).thenValue(
2371 [predicate = std::forward<P>(predicate),
2372 thunk = std::forward<F>(thunk)](auto&&) mutable {
2373 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
2374 });
2375 }
2376 return makeFuture();
2377}
2378
2379template <class F>
2380Future<Unit> times(const int n, F&& thunk) {
2381 return folly::whileDo(
2382 [n, count = std::make_unique<std::atomic<int>>(0)]() mutable {
2383 return count->fetch_add(1, std::memory_order_relaxed) < n;
2384 },
2385 std::forward<F>(thunk));
2386}
2387
2388namespace futures {
2389template <class It, class F, class ItT, class Tag, class Result>
2390std::vector<Future<Result>> mapValue(It first, It last, F func) {
2391 std::vector<Future<Result>> results;
2392 results.reserve(std::distance(first, last));
2393 for (auto it = first; it != last; it++) {
2394 results.push_back(std::move(*it).thenValue(func));
2395 }
2396 return results;
2397}
2398
2399template <class It, class F, class ItT, class Tag, class Result>
2400std::vector<Future<Result>> mapTry(It first, It last, F func, int) {
2401 std::vector<Future<Result>> results;
2402 results.reserve(std::distance(first, last));
2403 for (auto it = first; it != last; it++) {
2404 results.push_back(std::move(*it).thenTry(func));
2405 }
2406 return results;
2407}
2408
2409template <class It, class F, class ItT, class Tag, class Result>
2410std::vector<Future<Result>>
2411mapValue(Executor& exec, It first, It last, F func) {
2412 std::vector<Future<Result>> results;
2413 results.reserve(std::distance(first, last));
2414 for (auto it = first; it != last; it++) {
2415 results.push_back(std::move(*it).via(&exec).thenValue(func));
2416 }
2417 return results;
2418}
2419
2420template <class It, class F, class ItT, class Tag, class Result>
2421std::vector<Future<Result>>
2422mapTry(Executor& exec, It first, It last, F func, int) {
2423 std::vector<Future<Result>> results;
2424 results.reserve(std::distance(first, last));
2425 for (auto it = first; it != last; it++) {
2426 results.push_back(std::move(*it).via(&exec).thenTry(func));
2427 }
2428 return results;
2429}
2430
2431} // namespace futures
2432
2433template <class Clock>
2434Future<Unit> Timekeeper::at(std::chrono::time_point<Clock> when) {
2435 auto now = Clock::now();
2436
2437 if (when <= now) {
2438 return makeFuture();
2439 }
2440
2441 return after(std::chrono::duration_cast<Duration>(when - now));
2442}
2443
2444// Instantiate the most common Future types to save compile time
2445extern template class Future<Unit>;
2446extern template class Future<bool>;
2447extern template class Future<int>;
2448extern template class Future<int64_t>;
2449extern template class Future<std::string>;
2450extern template class Future<double>;
2451} // namespace folly
2452