1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/Likely.h> |
20 | #include <folly/detail/Futex.h> |
21 | #include <folly/detail/MemoryIdler.h> |
22 | #include <folly/portability/Asm.h> |
23 | #include <folly/synchronization/WaitOptions.h> |
24 | #include <folly/synchronization/detail/Spin.h> |
25 | |
26 | #include <glog/logging.h> |
27 | |
28 | #include <atomic> |
29 | |
30 | namespace folly { |
31 | |
32 | /// SaturatingSemaphore is a flag that allows concurrent posting by |
33 | /// multiple posters and concurrent non-destructive waiting by |
34 | /// multiple waiters. |
35 | /// |
36 | /// A SaturatingSemaphore allows one or more waiter threads to check, |
37 | /// spin, or block, indefinitely or with timeout, for a flag to be set |
38 | /// by one or more poster threads. By setting the flag, posters |
39 | /// announce to waiters (that may be already waiting or will check |
40 | /// the flag in the future) that some condition is true. Posts to an |
41 | /// already set flag are idempotent. |
42 | /// |
43 | /// SaturatingSemaphore is called so because it behaves like a hybrid |
44 | /// binary/counted _semaphore_ with values zero and infinity, and |
45 | /// post() and wait() functions. It is called _saturating_ because one |
46 | /// post() is enough to set it to infinity and to satisfy any number |
47 | /// of wait()-s. Once set (to infinity) it remains unchanged by |
48 | /// subsequent post()-s and wait()-s, until it is reset() back to |
49 | /// zero. |
50 | /// |
51 | /// The implementation of SaturatingSemaphore is based on that of |
52 | /// Baton. It includes no internal padding, and is only 4 bytes in |
53 | /// size. Any alignment or padding to avoid false sharing is up to |
54 | /// the user. |
55 | /// SaturatingSemaphore differs from Baton as follows: |
56 | /// - Baton allows at most one call to post(); this allows any number |
57 | /// and concurrently. |
58 | /// - Baton allows at most one successful call to any wait variant; |
59 | /// this allows any number and concurrently. |
60 | /// |
61 | /// Template parameter: |
62 | /// - bool MayBlock: If false, waiting operations spin only. If |
63 | /// true, timed and wait operations may block; adds an atomic |
64 | /// instruction to the critical path of posters. |
65 | /// |
66 | /// Wait options: |
67 | /// WaitOptions contains optional per call setting for spin-max duration: |
68 | /// Calls to wait(), try_wait_until(), and try_wait_for() block only after the |
69 | /// passage of the spin-max period. The default spin-max duration is 10 usec. |
70 | /// The spin-max option is applicable only if MayBlock is true. |
71 | /// |
72 | /// Functions: |
73 | /// bool ready(): |
74 | /// Returns true if the flag is set by a call to post, otherwise false. |
75 | /// Equivalent to try_wait, but available on const receivers. |
76 | /// void reset(); |
77 | /// Clears the flag. |
78 | /// void post(); |
79 | /// Sets the flag and wakes all current waiters, i.e., causes all |
80 | /// concurrent calls to wait, try_wait_for, and try_wait_until to |
81 | /// return. |
82 | /// void wait( |
83 | /// WaitOptions opt = wait_options()); |
84 | /// Waits for the flag to be set by a call to post. |
85 | /// bool try_wait(); |
86 | /// Returns true if the flag is set by a call to post, otherwise false. |
87 | /// bool try_wait_until( |
88 | /// time_point& deadline, |
89 | /// WaitOptions& = wait_options()); |
90 | /// Returns true if the flag is set by a call to post before the |
91 | /// deadline, otherwise false. |
92 | /// bool try_wait_for( |
93 | /// duration&, |
94 | /// WaitOptions& = wait_options()); |
95 | /// Returns true if the flag is set by a call to post before the |
96 | /// expiration of the specified duration, otherwise false. |
97 | /// |
98 | /// Usage: |
99 | /// @code |
100 | /// SaturatingSemaphore</* MayBlock = */ true> f; |
101 | /// ASSERT_FALSE(f.try_wait()); |
102 | /// ASSERT_FALSE(f.try_wait_until( |
103 | /// std::chrono::steady_clock::now() + std::chrono::microseconds(1))); |
104 | /// ASSERT_FALSE(f.try_wait_until( |
105 | /// std::chrono::steady_clock::now() + std::chrono::microseconds(1), |
106 | /// f.wait_options().spin_max(std::chrono::microseconds(1)))); |
107 | /// f.post(); |
108 | /// f.post(); |
109 | /// f.wait(); |
110 | /// f.wait(f.wait_options().spin_max(std::chrono::nanoseconds(100))); |
111 | /// ASSERT_TRUE(f.try_wait()); |
112 | /// ASSERT_TRUE(f.try_wait_until( |
113 | /// std::chrono::steady_clock::now() + std::chrono::microseconds(1))); |
114 | /// f.wait(); |
115 | /// f.reset(); |
116 | /// ASSERT_FALSE(f.try_wait()); |
117 | /// @endcode |
118 | |
119 | template <bool MayBlock, template <typename> class Atom = std::atomic> |
120 | class SaturatingSemaphore { |
121 | detail::Futex<Atom> state_; |
122 | |
123 | enum State : uint32_t { |
124 | NOTREADY = 0, |
125 | READY = 1, |
126 | BLOCKED = 2, |
127 | }; |
128 | |
129 | public: |
130 | FOLLY_ALWAYS_INLINE static constexpr WaitOptions wait_options() { |
131 | return {}; |
132 | } |
133 | |
134 | /** constructor */ |
135 | constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {} |
136 | |
137 | /** destructor */ |
138 | ~SaturatingSemaphore() {} |
139 | |
140 | /** ready */ |
141 | FOLLY_ALWAYS_INLINE bool ready() const noexcept { |
142 | return state_.load(std::memory_order_acquire) == READY; |
143 | } |
144 | |
145 | /** reset */ |
146 | void reset() noexcept { |
147 | state_.store(NOTREADY, std::memory_order_relaxed); |
148 | } |
149 | |
150 | /** post */ |
151 | FOLLY_ALWAYS_INLINE void post() noexcept { |
152 | if (!MayBlock) { |
153 | state_.store(READY, std::memory_order_release); |
154 | } else { |
155 | postFastWaiterMayBlock(); |
156 | } |
157 | } |
158 | |
159 | /** wait */ |
160 | FOLLY_ALWAYS_INLINE |
161 | void wait(const WaitOptions& opt = wait_options()) noexcept { |
162 | try_wait_until(std::chrono::steady_clock::time_point::max(), opt); |
163 | } |
164 | |
165 | /** try_wait */ |
166 | FOLLY_ALWAYS_INLINE bool try_wait() noexcept { |
167 | return ready(); |
168 | } |
169 | |
170 | /** try_wait_until */ |
171 | template <typename Clock, typename Duration> |
172 | FOLLY_ALWAYS_INLINE bool try_wait_until( |
173 | const std::chrono::time_point<Clock, Duration>& deadline, |
174 | const WaitOptions& opt = wait_options()) noexcept { |
175 | if (LIKELY(try_wait())) { |
176 | return true; |
177 | } |
178 | return tryWaitSlow(deadline, opt); |
179 | } |
180 | |
181 | /** try_wait_for */ |
182 | template <class Rep, class Period> |
183 | FOLLY_ALWAYS_INLINE bool try_wait_for( |
184 | const std::chrono::duration<Rep, Period>& duration, |
185 | const WaitOptions& opt = wait_options()) noexcept { |
186 | if (LIKELY(try_wait())) { |
187 | return true; |
188 | } |
189 | auto deadline = std::chrono::steady_clock::now() + duration; |
190 | return tryWaitSlow(deadline, opt); |
191 | } |
192 | |
193 | private: |
194 | FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept { |
195 | uint32_t before = NOTREADY; |
196 | if (LIKELY(state_.compare_exchange_strong( |
197 | before, |
198 | READY, |
199 | std::memory_order_release, |
200 | std::memory_order_relaxed))) { |
201 | return; |
202 | } |
203 | postSlowWaiterMayBlock(before); |
204 | } |
205 | |
206 | void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below |
207 | |
208 | template <typename Clock, typename Duration> |
209 | bool tryWaitSlow( |
210 | const std::chrono::time_point<Clock, Duration>& deadline, |
211 | const WaitOptions& opt) noexcept; // defined below |
212 | }; |
213 | |
214 | /// |
215 | /// Member function definitioons |
216 | /// |
217 | |
218 | /** postSlowWaiterMayBlock */ |
219 | template <bool MayBlock, template <typename> class Atom> |
220 | FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock( |
221 | uint32_t before) noexcept { |
222 | while (true) { |
223 | if (before == NOTREADY) { |
224 | if (state_.compare_exchange_strong( |
225 | before, |
226 | READY, |
227 | std::memory_order_release, |
228 | std::memory_order_relaxed)) { |
229 | return; |
230 | } |
231 | } |
232 | if (before == READY) { // Only if multiple posters |
233 | // The reason for not simply returning (without the following |
234 | // steps) is to prevent the following case: |
235 | // |
236 | // T1: T2: T3: |
237 | // local1.post(); local2.post(); global.wait(); |
238 | // global.post(); global.post(); global.reset(); |
239 | // seq_cst fence |
240 | // local1.try_wait() == true; |
241 | // local2.try_wait() == false; |
242 | // |
243 | // This following steps correspond to T2's global.post(), where |
244 | // global is already posted by T1. |
245 | // |
246 | // The following fence and load guarantee that T3 does not miss |
247 | // T2's prior stores, i.e., local2.post() in this example. |
248 | // |
249 | // The following case is prevented: |
250 | // |
251 | // Starting with local2 == NOTREADY and global == READY |
252 | // |
253 | // T2: T3: |
254 | // store READY to local2 // post store NOTREADY to global // reset |
255 | // seq_cst fenc seq_cst fence |
256 | // load READY from global // post load NOTREADY from local2 // try_wait |
257 | // |
258 | std::atomic_thread_fence(std::memory_order_seq_cst); |
259 | before = state_.load(std::memory_order_relaxed); |
260 | if (before == READY) { |
261 | return; |
262 | } |
263 | continue; |
264 | } |
265 | DCHECK_EQ(before, BLOCKED); |
266 | if (state_.compare_exchange_strong( |
267 | before, |
268 | READY, |
269 | std::memory_order_release, |
270 | std::memory_order_relaxed)) { |
271 | detail::futexWake(&state_); |
272 | return; |
273 | } |
274 | } |
275 | } |
276 | |
277 | /** tryWaitSlow */ |
278 | template <bool MayBlock, template <typename> class Atom> |
279 | template <typename Clock, typename Duration> |
280 | FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow( |
281 | const std::chrono::time_point<Clock, Duration>& deadline, |
282 | const WaitOptions& opt) noexcept { |
283 | switch (detail::spin_pause_until(deadline, opt, [=] { return ready(); })) { |
284 | case detail::spin_result::success: |
285 | return true; |
286 | case detail::spin_result::timeout: |
287 | return false; |
288 | case detail::spin_result::advance: |
289 | break; |
290 | } |
291 | |
292 | if (!MayBlock) { |
293 | switch (detail::spin_yield_until(deadline, [=] { return ready(); })) { |
294 | case detail::spin_result::success: |
295 | return true; |
296 | case detail::spin_result::timeout: |
297 | return false; |
298 | case detail::spin_result::advance: |
299 | break; |
300 | } |
301 | } |
302 | |
303 | auto before = state_.load(std::memory_order_relaxed); |
304 | while (before == NOTREADY && |
305 | !state_.compare_exchange_strong( |
306 | before, |
307 | BLOCKED, |
308 | std::memory_order_relaxed, |
309 | std::memory_order_relaxed)) { |
310 | if (before == READY) { |
311 | // TODO: move the acquire to the compare_exchange failure load after C++17 |
312 | std::atomic_thread_fence(std::memory_order_acquire); |
313 | return true; |
314 | } |
315 | } |
316 | |
317 | while (true) { |
318 | auto rv = detail::MemoryIdler::futexWaitUntil(state_, BLOCKED, deadline); |
319 | if (rv == detail::FutexResult::TIMEDOUT) { |
320 | assert(deadline != (std::chrono::time_point<Clock, Duration>::max())); |
321 | return false; |
322 | } |
323 | |
324 | if (ready()) { |
325 | return true; |
326 | } |
327 | } |
328 | } |
329 | |
330 | } // namespace folly |
331 | |