1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/Likely.h>
20#include <folly/detail/Futex.h>
21#include <folly/detail/MemoryIdler.h>
22#include <folly/portability/Asm.h>
23#include <folly/synchronization/WaitOptions.h>
24#include <folly/synchronization/detail/Spin.h>
25
26#include <glog/logging.h>
27
28#include <atomic>
29
30namespace folly {
31
32/// SaturatingSemaphore is a flag that allows concurrent posting by
33/// multiple posters and concurrent non-destructive waiting by
34/// multiple waiters.
35///
36/// A SaturatingSemaphore allows one or more waiter threads to check,
37/// spin, or block, indefinitely or with timeout, for a flag to be set
38/// by one or more poster threads. By setting the flag, posters
39/// announce to waiters (that may be already waiting or will check
40/// the flag in the future) that some condition is true. Posts to an
41/// already set flag are idempotent.
42///
43/// SaturatingSemaphore is called so because it behaves like a hybrid
44/// binary/counted _semaphore_ with values zero and infinity, and
45/// post() and wait() functions. It is called _saturating_ because one
46/// post() is enough to set it to infinity and to satisfy any number
47/// of wait()-s. Once set (to infinity) it remains unchanged by
48/// subsequent post()-s and wait()-s, until it is reset() back to
49/// zero.
50///
51/// The implementation of SaturatingSemaphore is based on that of
52/// Baton. It includes no internal padding, and is only 4 bytes in
53/// size. Any alignment or padding to avoid false sharing is up to
54/// the user.
55/// SaturatingSemaphore differs from Baton as follows:
56/// - Baton allows at most one call to post(); this allows any number
57/// and concurrently.
58/// - Baton allows at most one successful call to any wait variant;
59/// this allows any number and concurrently.
60///
61/// Template parameter:
62/// - bool MayBlock: If false, waiting operations spin only. If
63/// true, timed and wait operations may block; adds an atomic
64/// instruction to the critical path of posters.
65///
66/// Wait options:
67/// WaitOptions contains optional per call setting for spin-max duration:
68/// Calls to wait(), try_wait_until(), and try_wait_for() block only after the
69/// passage of the spin-max period. The default spin-max duration is 10 usec.
70/// The spin-max option is applicable only if MayBlock is true.
71///
72/// Functions:
73/// bool ready():
74/// Returns true if the flag is set by a call to post, otherwise false.
75/// Equivalent to try_wait, but available on const receivers.
76/// void reset();
77/// Clears the flag.
78/// void post();
79/// Sets the flag and wakes all current waiters, i.e., causes all
80/// concurrent calls to wait, try_wait_for, and try_wait_until to
81/// return.
82/// void wait(
83/// WaitOptions opt = wait_options());
84/// Waits for the flag to be set by a call to post.
85/// bool try_wait();
86/// Returns true if the flag is set by a call to post, otherwise false.
87/// bool try_wait_until(
88/// time_point& deadline,
89/// WaitOptions& = wait_options());
90/// Returns true if the flag is set by a call to post before the
91/// deadline, otherwise false.
92/// bool try_wait_for(
93/// duration&,
94/// WaitOptions& = wait_options());
95/// Returns true if the flag is set by a call to post before the
96/// expiration of the specified duration, otherwise false.
97///
98/// Usage:
99/// @code
100/// SaturatingSemaphore</* MayBlock = */ true> f;
101/// ASSERT_FALSE(f.try_wait());
102/// ASSERT_FALSE(f.try_wait_until(
103/// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
104/// ASSERT_FALSE(f.try_wait_until(
105/// std::chrono::steady_clock::now() + std::chrono::microseconds(1),
106/// f.wait_options().spin_max(std::chrono::microseconds(1))));
107/// f.post();
108/// f.post();
109/// f.wait();
110/// f.wait(f.wait_options().spin_max(std::chrono::nanoseconds(100)));
111/// ASSERT_TRUE(f.try_wait());
112/// ASSERT_TRUE(f.try_wait_until(
113/// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
114/// f.wait();
115/// f.reset();
116/// ASSERT_FALSE(f.try_wait());
117/// @endcode
118
119template <bool MayBlock, template <typename> class Atom = std::atomic>
120class SaturatingSemaphore {
121 detail::Futex<Atom> state_;
122
123 enum State : uint32_t {
124 NOTREADY = 0,
125 READY = 1,
126 BLOCKED = 2,
127 };
128
129 public:
130 FOLLY_ALWAYS_INLINE static constexpr WaitOptions wait_options() {
131 return {};
132 }
133
134 /** constructor */
135 constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
136
137 /** destructor */
138 ~SaturatingSemaphore() {}
139
140 /** ready */
141 FOLLY_ALWAYS_INLINE bool ready() const noexcept {
142 return state_.load(std::memory_order_acquire) == READY;
143 }
144
145 /** reset */
146 void reset() noexcept {
147 state_.store(NOTREADY, std::memory_order_relaxed);
148 }
149
150 /** post */
151 FOLLY_ALWAYS_INLINE void post() noexcept {
152 if (!MayBlock) {
153 state_.store(READY, std::memory_order_release);
154 } else {
155 postFastWaiterMayBlock();
156 }
157 }
158
159 /** wait */
160 FOLLY_ALWAYS_INLINE
161 void wait(const WaitOptions& opt = wait_options()) noexcept {
162 try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
163 }
164
165 /** try_wait */
166 FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
167 return ready();
168 }
169
170 /** try_wait_until */
171 template <typename Clock, typename Duration>
172 FOLLY_ALWAYS_INLINE bool try_wait_until(
173 const std::chrono::time_point<Clock, Duration>& deadline,
174 const WaitOptions& opt = wait_options()) noexcept {
175 if (LIKELY(try_wait())) {
176 return true;
177 }
178 return tryWaitSlow(deadline, opt);
179 }
180
181 /** try_wait_for */
182 template <class Rep, class Period>
183 FOLLY_ALWAYS_INLINE bool try_wait_for(
184 const std::chrono::duration<Rep, Period>& duration,
185 const WaitOptions& opt = wait_options()) noexcept {
186 if (LIKELY(try_wait())) {
187 return true;
188 }
189 auto deadline = std::chrono::steady_clock::now() + duration;
190 return tryWaitSlow(deadline, opt);
191 }
192
193 private:
194 FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
195 uint32_t before = NOTREADY;
196 if (LIKELY(state_.compare_exchange_strong(
197 before,
198 READY,
199 std::memory_order_release,
200 std::memory_order_relaxed))) {
201 return;
202 }
203 postSlowWaiterMayBlock(before);
204 }
205
206 void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
207
208 template <typename Clock, typename Duration>
209 bool tryWaitSlow(
210 const std::chrono::time_point<Clock, Duration>& deadline,
211 const WaitOptions& opt) noexcept; // defined below
212};
213
214///
215/// Member function definitioons
216///
217
218/** postSlowWaiterMayBlock */
219template <bool MayBlock, template <typename> class Atom>
220FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
221 uint32_t before) noexcept {
222 while (true) {
223 if (before == NOTREADY) {
224 if (state_.compare_exchange_strong(
225 before,
226 READY,
227 std::memory_order_release,
228 std::memory_order_relaxed)) {
229 return;
230 }
231 }
232 if (before == READY) { // Only if multiple posters
233 // The reason for not simply returning (without the following
234 // steps) is to prevent the following case:
235 //
236 // T1: T2: T3:
237 // local1.post(); local2.post(); global.wait();
238 // global.post(); global.post(); global.reset();
239 // seq_cst fence
240 // local1.try_wait() == true;
241 // local2.try_wait() == false;
242 //
243 // This following steps correspond to T2's global.post(), where
244 // global is already posted by T1.
245 //
246 // The following fence and load guarantee that T3 does not miss
247 // T2's prior stores, i.e., local2.post() in this example.
248 //
249 // The following case is prevented:
250 //
251 // Starting with local2 == NOTREADY and global == READY
252 //
253 // T2: T3:
254 // store READY to local2 // post store NOTREADY to global // reset
255 // seq_cst fenc seq_cst fence
256 // load READY from global // post load NOTREADY from local2 // try_wait
257 //
258 std::atomic_thread_fence(std::memory_order_seq_cst);
259 before = state_.load(std::memory_order_relaxed);
260 if (before == READY) {
261 return;
262 }
263 continue;
264 }
265 DCHECK_EQ(before, BLOCKED);
266 if (state_.compare_exchange_strong(
267 before,
268 READY,
269 std::memory_order_release,
270 std::memory_order_relaxed)) {
271 detail::futexWake(&state_);
272 return;
273 }
274 }
275}
276
277/** tryWaitSlow */
278template <bool MayBlock, template <typename> class Atom>
279template <typename Clock, typename Duration>
280FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
281 const std::chrono::time_point<Clock, Duration>& deadline,
282 const WaitOptions& opt) noexcept {
283 switch (detail::spin_pause_until(deadline, opt, [=] { return ready(); })) {
284 case detail::spin_result::success:
285 return true;
286 case detail::spin_result::timeout:
287 return false;
288 case detail::spin_result::advance:
289 break;
290 }
291
292 if (!MayBlock) {
293 switch (detail::spin_yield_until(deadline, [=] { return ready(); })) {
294 case detail::spin_result::success:
295 return true;
296 case detail::spin_result::timeout:
297 return false;
298 case detail::spin_result::advance:
299 break;
300 }
301 }
302
303 auto before = state_.load(std::memory_order_relaxed);
304 while (before == NOTREADY &&
305 !state_.compare_exchange_strong(
306 before,
307 BLOCKED,
308 std::memory_order_relaxed,
309 std::memory_order_relaxed)) {
310 if (before == READY) {
311 // TODO: move the acquire to the compare_exchange failure load after C++17
312 std::atomic_thread_fence(std::memory_order_acquire);
313 return true;
314 }
315 }
316
317 while (true) {
318 auto rv = detail::MemoryIdler::futexWaitUntil(state_, BLOCKED, deadline);
319 if (rv == detail::FutexResult::TIMEDOUT) {
320 assert(deadline != (std::chrono::time_point<Clock, Duration>::max()));
321 return false;
322 }
323
324 if (ready()) {
325 return true;
326 }
327 }
328}
329
330} // namespace folly
331