1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <algorithm> |
20 | #include <limits> |
21 | |
22 | #include <folly/detail/Futex.h> |
23 | #include <folly/portability/Asm.h> |
24 | #include <folly/portability/Unistd.h> |
25 | |
26 | #include <glog/logging.h> |
27 | |
28 | namespace folly { |
29 | |
30 | namespace detail { |
31 | |
32 | /// A TurnSequencer allows threads to order their execution according to |
33 | /// a monotonically increasing (with wraparound) "turn" value. The two |
34 | /// operations provided are to wait for turn T, and to move to the next |
35 | /// turn. Every thread that is waiting for T must have arrived before |
36 | /// that turn is marked completed (for MPMCQueue only one thread waits |
37 | /// for any particular turn, so this is trivially true). |
38 | /// |
39 | /// TurnSequencer's state_ holds 26 bits of the current turn (shifted |
40 | /// left by 6), along with a 6 bit saturating value that records the |
41 | /// maximum waiter minus the current turn. Wraparound of the turn space |
42 | /// is expected and handled. This allows us to atomically adjust the |
43 | /// number of outstanding waiters when we perform a FUTEX_WAKE operation. |
44 | /// Compare this strategy to sem_t's separate num_waiters field, which |
45 | /// isn't decremented until after the waiting thread gets scheduled, |
46 | /// during which time more enqueues might have occurred and made pointless |
47 | /// FUTEX_WAKE calls. |
48 | /// |
49 | /// TurnSequencer uses futex() directly. It is optimized for the |
50 | /// case that the highest awaited turn is 32 or less higher than the |
51 | /// current turn. We use the FUTEX_WAIT_BITSET variant, which lets |
52 | /// us embed 32 separate wakeup channels in a single futex. See |
53 | /// http://locklessinc.com/articles/futex_cheat_sheet for a description. |
54 | /// |
55 | /// We only need to keep exact track of the delta between the current |
56 | /// turn and the maximum waiter for the 32 turns that follow the current |
57 | /// one, because waiters at turn t+32 will be awoken at turn t. At that |
58 | /// point they can then adjust the delta using the higher base. Since we |
59 | /// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits. |
60 | /// We actually store waiter deltas up to 63, since that might reduce |
61 | /// the number of CAS operations a tiny bit. |
62 | /// |
63 | /// To avoid some futex() calls entirely, TurnSequencer uses an adaptive |
64 | /// spin cutoff before waiting. The overheads (and convergence rate) |
65 | /// of separately tracking the spin cutoff for each TurnSequencer would |
66 | /// be prohibitive, so the actual storage is passed in as a parameter and |
67 | /// updated atomically. This also lets the caller use different adaptive |
68 | /// cutoffs for different operations (read versus write, for example). |
69 | /// To avoid contention, the spin cutoff is only updated when requested |
70 | /// by the caller. |
71 | template <template <typename> class Atom> |
72 | struct TurnSequencer { |
73 | explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept |
74 | : state_(encode(firstTurn << kTurnShift, 0)) {} |
75 | |
76 | /// Returns true iff a call to waitForTurn(turn, ...) won't block |
77 | bool isTurn(const uint32_t turn) const noexcept { |
78 | auto state = state_.load(std::memory_order_acquire); |
79 | return decodeCurrentSturn(state) == (turn << kTurnShift); |
80 | } |
81 | |
82 | enum class TryWaitResult { SUCCESS, PAST, TIMEDOUT }; |
83 | |
84 | /// See tryWaitForTurn |
85 | /// Requires that `turn` is not a turn in the past. |
86 | void waitForTurn( |
87 | const uint32_t turn, |
88 | Atom<uint32_t>& spinCutoff, |
89 | const bool updateSpinCutoff) noexcept { |
90 | const auto ret = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff); |
91 | DCHECK(ret == TryWaitResult::SUCCESS); |
92 | } |
93 | |
94 | // Internally we always work with shifted turn values, which makes the |
95 | // truncation and wraparound work correctly. This leaves us bits at |
96 | // the bottom to store the number of waiters. We call shifted turns |
97 | // "sturns" inside this class. |
98 | |
99 | /// Blocks the current thread until turn has arrived. If |
100 | /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries |
101 | /// before blocking and will adjust spinCutoff based on the results, |
102 | /// otherwise it will spin for at most spinCutoff spins. |
103 | /// Returns SUCCESS if the wait succeeded, PAST if the turn is in the past |
104 | /// or TIMEDOUT if the absTime time value is not nullptr and is reached before |
105 | /// the turn arrives |
106 | template < |
107 | class Clock = std::chrono::steady_clock, |
108 | class Duration = typename Clock::duration> |
109 | TryWaitResult tryWaitForTurn( |
110 | const uint32_t turn, |
111 | Atom<uint32_t>& spinCutoff, |
112 | const bool updateSpinCutoff, |
113 | const std::chrono::time_point<Clock, Duration>* absTime = |
114 | nullptr) noexcept { |
115 | uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed); |
116 | const uint32_t effectiveSpinCutoff = |
117 | updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh; |
118 | |
119 | uint32_t tries; |
120 | const uint32_t sturn = turn << kTurnShift; |
121 | for (tries = 0;; ++tries) { |
122 | uint32_t state = state_.load(std::memory_order_acquire); |
123 | uint32_t current_sturn = decodeCurrentSturn(state); |
124 | if (current_sturn == sturn) { |
125 | break; |
126 | } |
127 | |
128 | // wrap-safe version of (current_sturn >= sturn) |
129 | if (sturn - current_sturn >= std::numeric_limits<uint32_t>::max() / 2) { |
130 | // turn is in the past |
131 | return TryWaitResult::PAST; |
132 | } |
133 | |
134 | // the first effectSpinCutoff tries are spins, after that we will |
135 | // record ourself as a waiter and block with futexWait |
136 | if (tries < effectiveSpinCutoff) { |
137 | asm_volatile_pause(); |
138 | continue; |
139 | } |
140 | |
141 | uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state); |
142 | uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift; |
143 | uint32_t new_state; |
144 | if (our_waiter_delta <= current_max_waiter_delta) { |
145 | // state already records us as waiters, probably because this |
146 | // isn't our first time around this loop |
147 | new_state = state; |
148 | } else { |
149 | new_state = encode(current_sturn, our_waiter_delta); |
150 | if (state != new_state && |
151 | !state_.compare_exchange_strong(state, new_state)) { |
152 | continue; |
153 | } |
154 | } |
155 | if (absTime) { |
156 | auto futexResult = detail::futexWaitUntil( |
157 | &state_, new_state, *absTime, futexChannel(turn)); |
158 | if (futexResult == FutexResult::TIMEDOUT) { |
159 | return TryWaitResult::TIMEDOUT; |
160 | } |
161 | } else { |
162 | detail::futexWait(&state_, new_state, futexChannel(turn)); |
163 | } |
164 | } |
165 | |
166 | if (updateSpinCutoff || prevThresh == 0) { |
167 | // if we hit kMaxSpins then spinning was pointless, so the right |
168 | // spinCutoff is kMinSpins |
169 | uint32_t target; |
170 | if (tries >= kMaxSpins) { |
171 | target = kMinSpins; |
172 | } else { |
173 | // to account for variations, we allow ourself to spin 2*N when |
174 | // we think that N is actually required in order to succeed |
175 | target = std::min<uint32_t>( |
176 | kMaxSpins, std::max<uint32_t>(kMinSpins, tries * 2)); |
177 | } |
178 | |
179 | if (prevThresh == 0) { |
180 | // bootstrap |
181 | spinCutoff.store(target); |
182 | } else { |
183 | // try once, keep moving if CAS fails. Exponential moving average |
184 | // with alpha of 7/8 |
185 | // Be careful that the quantity we add to prevThresh is signed. |
186 | spinCutoff.compare_exchange_weak( |
187 | prevThresh, prevThresh + int(target - prevThresh) / 8); |
188 | } |
189 | } |
190 | |
191 | return TryWaitResult::SUCCESS; |
192 | } |
193 | |
194 | /// Unblocks a thread running waitForTurn(turn + 1) |
195 | void completeTurn(const uint32_t turn) noexcept { |
196 | uint32_t state = state_.load(std::memory_order_acquire); |
197 | while (true) { |
198 | DCHECK(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state))); |
199 | uint32_t max_waiter_delta = decodeMaxWaitersDelta(state); |
200 | uint32_t new_state = encode( |
201 | (turn + 1) << kTurnShift, |
202 | max_waiter_delta == 0 ? 0 : max_waiter_delta - 1); |
203 | if (state_.compare_exchange_strong(state, new_state)) { |
204 | if (max_waiter_delta != 0) { |
205 | detail::futexWake( |
206 | &state_, std::numeric_limits<int>::max(), futexChannel(turn + 1)); |
207 | } |
208 | break; |
209 | } |
210 | // failing compare_exchange_strong updates first arg to the value |
211 | // that caused the failure, so no need to reread state_ |
212 | } |
213 | } |
214 | |
215 | /// Returns the least-most significant byte of the current uncompleted |
216 | /// turn. The full 32 bit turn cannot be recovered. |
217 | uint8_t uncompletedTurnLSB() const noexcept { |
218 | return uint8_t(state_.load(std::memory_order_acquire) >> kTurnShift); |
219 | } |
220 | |
221 | private: |
222 | enum : uint32_t { |
223 | /// kTurnShift counts the bits that are stolen to record the delta |
224 | /// between the current turn and the maximum waiter. It needs to be big |
225 | /// enough to record wait deltas of 0 to 32 inclusive. Waiters more |
226 | /// than 32 in the future will be woken up 32*n turns early (since |
227 | /// their BITSET will hit) and will adjust the waiter count again. |
228 | /// We go a bit beyond and let the waiter count go up to 63, which |
229 | /// is free and might save us a few CAS |
230 | kTurnShift = 6, |
231 | kWaitersMask = (1 << kTurnShift) - 1, |
232 | |
233 | /// The minimum spin count that we will adaptively select |
234 | kMinSpins = 20, |
235 | |
236 | /// The maximum spin count that we will adaptively select, and the |
237 | /// spin count that will be used when probing to get a new data point |
238 | /// for the adaptation |
239 | kMaxSpins = 2000, |
240 | }; |
241 | |
242 | /// This holds both the current turn, and the highest waiting turn, |
243 | /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn)) |
244 | Futex<Atom> state_; |
245 | |
246 | /// Returns the bitmask to pass futexWait or futexWake when communicating |
247 | /// about the specified turn |
248 | uint32_t futexChannel(uint32_t turn) const noexcept { |
249 | return 1u << (turn & 31); |
250 | } |
251 | |
252 | uint32_t decodeCurrentSturn(uint32_t state) const noexcept { |
253 | return state & ~kWaitersMask; |
254 | } |
255 | |
256 | uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept { |
257 | return state & kWaitersMask; |
258 | } |
259 | |
260 | uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept { |
261 | return currentSturn | std::min(uint32_t{kWaitersMask}, maxWaiterD); |
262 | } |
263 | }; |
264 | |
265 | } // namespace detail |
266 | } // namespace folly |
267 | |