1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#include <folly/futures/ThreadWheelTimekeeper.h>
17
18#include <folly/Singleton.h>
19#include <folly/futures/Future.h>
20#include <future>
21
22namespace folly {
23
24namespace {
25Singleton<ThreadWheelTimekeeper> timekeeperSingleton_;
26
27// Our Callback object for HHWheelTimer
28struct WTCallback : public std::enable_shared_from_this<WTCallback>,
29 public folly::HHWheelTimer::Callback {
30 struct PrivateConstructorTag {};
31
32 public:
33 WTCallback(PrivateConstructorTag, EventBase* base) : base_(base) {}
34
35 // Only allow creation by this factory, to ensure heap allocation.
36 static std::shared_ptr<WTCallback> create(EventBase* base) {
37 // optimization opportunity: memory pool
38 auto cob = std::make_shared<WTCallback>(PrivateConstructorTag{}, base);
39 // Capture shared_ptr of cob in lambda so that Core inside Promise will
40 // hold a ref count to it. The ref count will be released when Core goes
41 // away which happens when both Promise and Future go away
42 cob->promise_.setInterruptHandler(
43 [cob](exception_wrapper ew) { cob->interruptHandler(std::move(ew)); });
44 return cob;
45 }
46
47 Future<Unit> getFuture() {
48 return promise_.getFuture();
49 }
50
51 FOLLY_NODISCARD Promise<Unit> stealPromise() {
52 // Don't need promise anymore. Break the circular reference as promise_
53 // is holding a ref count to us via Core. Core won't go away until both
54 // Promise and Future go away.
55 return std::move(promise_);
56 }
57
58 protected:
59 folly::Synchronized<EventBase*> base_;
60 Promise<Unit> promise_;
61
62 void timeoutExpired() noexcept override {
63 base_ = nullptr;
64 // Don't need Promise anymore, break the circular reference
65 auto promise = stealPromise();
66 if (!promise.isFulfilled()) {
67 promise.setValue();
68 }
69 }
70
71 void callbackCanceled() noexcept override {
72 base_ = nullptr;
73 // Don't need Promise anymore, break the circular reference
74 auto promise = stealPromise();
75 if (!promise.isFulfilled()) {
76 promise.setException(FutureNoTimekeeper{});
77 }
78 }
79
80 void interruptHandler(exception_wrapper ew) {
81 auto rBase = base_.rlock();
82 if (!*rBase) {
83 return;
84 }
85 // Capture shared_ptr of self in lambda, if we don't do this, object
86 // may go away before the lambda is executed from event base thread.
87 // This is not racing with timeoutExpired anymore because this is called
88 // through Future, which means Core is still alive and keeping a ref count
89 // on us, so what timeouExpired is doing won't make the object go away
90 (*rBase)->runInEventBaseThread(
91 [me = shared_from_this(), ew = std::move(ew)]() mutable {
92 me->cancelTimeout();
93 // Don't need Promise anymore, break the circular reference
94 auto promise = me->stealPromise();
95 if (!promise.isFulfilled()) {
96 promise.setException(std::move(ew));
97 }
98 });
99 }
100};
101
102} // namespace
103
104ThreadWheelTimekeeper::ThreadWheelTimekeeper()
105 : thread_([this] { eventBase_.loopForever(); }),
106 wheelTimer_(
107 HHWheelTimer::newTimer(&eventBase_, std::chrono::milliseconds(1))) {
108 eventBase_.waitUntilRunning();
109 eventBase_.runInEventBaseThread([this] {
110 // 15 characters max
111 eventBase_.setName("FutureTimekeepr");
112 });
113}
114
115ThreadWheelTimekeeper::~ThreadWheelTimekeeper() {
116 eventBase_.runInEventBaseThreadAndWait([this] {
117 wheelTimer_->cancelAll();
118 eventBase_.terminateLoopSoon();
119 });
120 thread_.join();
121}
122
123Future<Unit> ThreadWheelTimekeeper::after(Duration dur) {
124 auto cob = WTCallback::create(&eventBase_);
125 auto f = cob->getFuture();
126 //
127 // Even shared_ptr of cob is captured in lambda this is still somewhat *racy*
128 // because it will be released once timeout is scheduled. So technically there
129 // is no gurantee that EventBase thread can safely call timeout callback.
130 // However due to fact that we are having circular reference here:
131 // WTCallback->Promise->Core->WTCallbak, so three of them won't go away until
132 // we break the circular reference. The break happens either in
133 // WTCallback::timeoutExpired or WTCallback::interruptHandler. Former means
134 // timeout callback is being safely executed. Latter captures shared_ptr of
135 // WTCallback again in another lambda for canceling timeout. The moment
136 // canceling timeout is executed in EventBase thread, the actual timeout
137 // callback has either been executed, or will never be executed. So we are
138 // fine here.
139 //
140 if (!eventBase_.runInEventBaseThread(
141 [this, cob, dur] { wheelTimer_->scheduleTimeout(cob.get(), dur); })) {
142 // Release promise to break the circular reference. Because if
143 // scheduleTimeout fails, there is nothing to *promise*. Internally
144 // Core would automatically set an exception result when Promise is
145 // destructed before fulfilling.
146 // This is either called from EventBase thread, or here.
147 // They are somewhat racy but given the rare chance this could fail,
148 // I don't see it is introducing any problem yet.
149 auto promise = cob->stealPromise();
150 if (!promise.isFulfilled()) {
151 promise.setException(FutureNoTimekeeper{});
152 }
153 }
154 return f;
155}
156
157namespace detail {
158
159std::shared_ptr<Timekeeper> getTimekeeperSingleton() {
160 return timekeeperSingleton_.try_get();
161}
162
163} // namespace detail
164
165} // namespace folly
166