1 | /* |
2 | * Copyright 2014-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | #include <folly/futures/ThreadWheelTimekeeper.h> |
17 | |
18 | #include <folly/Singleton.h> |
19 | #include <folly/futures/Future.h> |
20 | #include <future> |
21 | |
22 | namespace folly { |
23 | |
24 | namespace { |
25 | Singleton<ThreadWheelTimekeeper> timekeeperSingleton_; |
26 | |
27 | // Our Callback object for HHWheelTimer |
28 | struct WTCallback : public std::enable_shared_from_this<WTCallback>, |
29 | public folly::HHWheelTimer::Callback { |
30 | struct PrivateConstructorTag {}; |
31 | |
32 | public: |
33 | WTCallback(PrivateConstructorTag, EventBase* base) : base_(base) {} |
34 | |
35 | // Only allow creation by this factory, to ensure heap allocation. |
36 | static std::shared_ptr<WTCallback> create(EventBase* base) { |
37 | // optimization opportunity: memory pool |
38 | auto cob = std::make_shared<WTCallback>(PrivateConstructorTag{}, base); |
39 | // Capture shared_ptr of cob in lambda so that Core inside Promise will |
40 | // hold a ref count to it. The ref count will be released when Core goes |
41 | // away which happens when both Promise and Future go away |
42 | cob->promise_.setInterruptHandler( |
43 | [cob](exception_wrapper ew) { cob->interruptHandler(std::move(ew)); }); |
44 | return cob; |
45 | } |
46 | |
47 | Future<Unit> getFuture() { |
48 | return promise_.getFuture(); |
49 | } |
50 | |
51 | FOLLY_NODISCARD Promise<Unit> stealPromise() { |
52 | // Don't need promise anymore. Break the circular reference as promise_ |
53 | // is holding a ref count to us via Core. Core won't go away until both |
54 | // Promise and Future go away. |
55 | return std::move(promise_); |
56 | } |
57 | |
58 | protected: |
59 | folly::Synchronized<EventBase*> base_; |
60 | Promise<Unit> promise_; |
61 | |
62 | void timeoutExpired() noexcept override { |
63 | base_ = nullptr; |
64 | // Don't need Promise anymore, break the circular reference |
65 | auto promise = stealPromise(); |
66 | if (!promise.isFulfilled()) { |
67 | promise.setValue(); |
68 | } |
69 | } |
70 | |
71 | void callbackCanceled() noexcept override { |
72 | base_ = nullptr; |
73 | // Don't need Promise anymore, break the circular reference |
74 | auto promise = stealPromise(); |
75 | if (!promise.isFulfilled()) { |
76 | promise.setException(FutureNoTimekeeper{}); |
77 | } |
78 | } |
79 | |
80 | void interruptHandler(exception_wrapper ew) { |
81 | auto rBase = base_.rlock(); |
82 | if (!*rBase) { |
83 | return; |
84 | } |
85 | // Capture shared_ptr of self in lambda, if we don't do this, object |
86 | // may go away before the lambda is executed from event base thread. |
87 | // This is not racing with timeoutExpired anymore because this is called |
88 | // through Future, which means Core is still alive and keeping a ref count |
89 | // on us, so what timeouExpired is doing won't make the object go away |
90 | (*rBase)->runInEventBaseThread( |
91 | [me = shared_from_this(), ew = std::move(ew)]() mutable { |
92 | me->cancelTimeout(); |
93 | // Don't need Promise anymore, break the circular reference |
94 | auto promise = me->stealPromise(); |
95 | if (!promise.isFulfilled()) { |
96 | promise.setException(std::move(ew)); |
97 | } |
98 | }); |
99 | } |
100 | }; |
101 | |
102 | } // namespace |
103 | |
104 | ThreadWheelTimekeeper::ThreadWheelTimekeeper() |
105 | : thread_([this] { eventBase_.loopForever(); }), |
106 | wheelTimer_( |
107 | HHWheelTimer::newTimer(&eventBase_, std::chrono::milliseconds(1))) { |
108 | eventBase_.waitUntilRunning(); |
109 | eventBase_.runInEventBaseThread([this] { |
110 | // 15 characters max |
111 | eventBase_.setName("FutureTimekeepr" ); |
112 | }); |
113 | } |
114 | |
115 | ThreadWheelTimekeeper::~ThreadWheelTimekeeper() { |
116 | eventBase_.runInEventBaseThreadAndWait([this] { |
117 | wheelTimer_->cancelAll(); |
118 | eventBase_.terminateLoopSoon(); |
119 | }); |
120 | thread_.join(); |
121 | } |
122 | |
123 | Future<Unit> ThreadWheelTimekeeper::after(Duration dur) { |
124 | auto cob = WTCallback::create(&eventBase_); |
125 | auto f = cob->getFuture(); |
126 | // |
127 | // Even shared_ptr of cob is captured in lambda this is still somewhat *racy* |
128 | // because it will be released once timeout is scheduled. So technically there |
129 | // is no gurantee that EventBase thread can safely call timeout callback. |
130 | // However due to fact that we are having circular reference here: |
131 | // WTCallback->Promise->Core->WTCallbak, so three of them won't go away until |
132 | // we break the circular reference. The break happens either in |
133 | // WTCallback::timeoutExpired or WTCallback::interruptHandler. Former means |
134 | // timeout callback is being safely executed. Latter captures shared_ptr of |
135 | // WTCallback again in another lambda for canceling timeout. The moment |
136 | // canceling timeout is executed in EventBase thread, the actual timeout |
137 | // callback has either been executed, or will never be executed. So we are |
138 | // fine here. |
139 | // |
140 | if (!eventBase_.runInEventBaseThread( |
141 | [this, cob, dur] { wheelTimer_->scheduleTimeout(cob.get(), dur); })) { |
142 | // Release promise to break the circular reference. Because if |
143 | // scheduleTimeout fails, there is nothing to *promise*. Internally |
144 | // Core would automatically set an exception result when Promise is |
145 | // destructed before fulfilling. |
146 | // This is either called from EventBase thread, or here. |
147 | // They are somewhat racy but given the rare chance this could fail, |
148 | // I don't see it is introducing any problem yet. |
149 | auto promise = cob->stealPromise(); |
150 | if (!promise.isFulfilled()) { |
151 | promise.setException(FutureNoTimekeeper{}); |
152 | } |
153 | } |
154 | return f; |
155 | } |
156 | |
157 | namespace detail { |
158 | |
159 | std::shared_ptr<Timekeeper> getTimekeeperSingleton() { |
160 | return timekeeperSingleton_.try_get(); |
161 | } |
162 | |
163 | } // namespace detail |
164 | |
165 | } // namespace folly |
166 | |