1 | /* |
2 | * Copyright 2014-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | #pragma once |
17 | |
18 | #include <algorithm> |
19 | #include <atomic> |
20 | #include <cstdint> |
21 | #include <cstring> |
22 | #include <memory> |
23 | #include <system_error> |
24 | |
25 | #include <folly/CPortability.h> |
26 | #include <folly/CachelinePadded.h> |
27 | #include <folly/IndexedMemPool.h> |
28 | #include <folly/Likely.h> |
29 | #include <folly/Traits.h> |
30 | #include <folly/lang/SafeAssert.h> |
31 | #include <folly/synchronization/AtomicStruct.h> |
32 | #include <folly/synchronization/SaturatingSemaphore.h> |
33 | |
34 | namespace folly { |
35 | |
36 | template < |
37 | template <typename> class Atom = std::atomic, |
38 | class BatonType = SaturatingSemaphore<true, Atom>> |
39 | struct LifoSemImpl; |
40 | |
41 | /// LifoSem is a semaphore that wakes its waiters in a manner intended to |
42 | /// maximize performance rather than fairness. It should be preferred |
43 | /// to a mutex+condvar or POSIX sem_t solution when all of the waiters |
44 | /// are equivalent. It is faster than a condvar or sem_t, and it has a |
45 | /// shutdown state that might save you a lot of complexity when it comes |
46 | /// time to shut down your work pipelines. LifoSem is larger than sem_t, |
47 | /// but that is only because it uses padding and alignment to avoid |
48 | /// false sharing. |
49 | /// |
50 | /// LifoSem allows multi-post and multi-tryWait, and provides a shutdown |
51 | /// state that awakens all waiters. LifoSem is faster than sem_t because |
52 | /// it performs exact wakeups, so it often requires fewer system calls. |
53 | /// It provides all of the functionality of sem_t except for timed waiting. |
54 | /// It is called LifoSem because its wakeup policy is approximately LIFO, |
55 | /// rather than the usual FIFO. |
56 | /// |
57 | /// The core semaphore operations provided are: |
58 | /// |
59 | /// -- post() -- if there is a pending waiter, wake it up, otherwise |
60 | /// increment the value of the semaphore. If the value of the semaphore |
61 | /// is already 2^32-1, does nothing. Compare to sem_post(). |
62 | /// |
63 | /// -- post(n) -- equivalent to n calls to post(), but much more efficient. |
64 | /// sem_t has no equivalent to this method. |
65 | /// |
66 | /// -- bool tryWait() -- if the semaphore's value is positive, decrements it |
67 | /// and returns true, otherwise returns false. Compare to sem_trywait(). |
68 | /// |
69 | /// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's |
70 | /// value by n, returning the amount by which it actually was decremented |
71 | /// (a value from 0 to n inclusive). Not atomic. Equivalent to n calls |
72 | /// to tryWait(). sem_t has no equivalent to this method. |
73 | /// |
74 | /// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait(). |
75 | /// |
76 | /// -- timed wait variants - will wait until timeout. Note when these |
77 | /// timeout, the current implementation takes a lock, blocking |
78 | /// concurrent pushes and pops. (If timed wait calls are |
79 | /// substantial, consider re-working this code to be lock-free). |
80 | /// |
81 | /// LifoSem also has the notion of a shutdown state, in which any calls |
82 | /// that would block (or are already blocked) throw ShutdownSemError. |
83 | /// Note the difference between a call to wait() and a call to wait() |
84 | /// that might block. In the former case tryWait() would succeed, and no |
85 | /// isShutdown() check is performed. In the latter case an exception is |
86 | /// thrown. This behavior allows a LifoSem controlling work distribution |
87 | /// to drain. If you want to immediately stop all waiting on shutdown, |
88 | /// you can just check isShutdown() yourself (preferrably wrapped in |
89 | /// an UNLIKELY). This fast-stop behavior is easy to add, but difficult |
90 | /// to remove if you want the draining behavior, which is why we have |
91 | /// chosen the former. |
92 | /// |
93 | /// All LifoSem operations except valueGuess() are guaranteed to be |
94 | /// linearizable. |
95 | typedef LifoSemImpl<> LifoSem; |
96 | |
97 | /// The exception thrown when wait()ing on an isShutdown() LifoSem |
98 | struct FOLLY_EXPORT ShutdownSemError : public std::runtime_error { |
99 | explicit ShutdownSemError(const std::string& msg); |
100 | ~ShutdownSemError() noexcept override; |
101 | }; |
102 | |
103 | namespace detail { |
104 | |
105 | // Internally, a LifoSem is either a value or a linked list of wait nodes. |
106 | // This union is captured in the LifoSemHead type, which holds either a |
107 | // value or an indexed pointer to the list. LifoSemHead itself is a value |
108 | // type, the head is a mutable atomic box containing a LifoSemHead value. |
109 | // Each wait node corresponds to exactly one waiter. Values can flow |
110 | // through the semaphore either by going into and out of the head's value, |
111 | // or by direct communication from a poster to a waiter. The former path |
112 | // is taken when there are no pending waiters, the latter otherwise. The |
113 | // general flow of a post is to try to increment the value or pop-and-post |
114 | // a wait node. Either of those have the effect of conveying one semaphore |
115 | // unit. Waiting is the opposite, either a decrement of the value or |
116 | // push-and-wait of a wait node. The generic LifoSemBase abstracts the |
117 | // actual mechanism by which a wait node's post->wait communication is |
118 | // performed, which is why we have LifoSemRawNode and LifoSemNode. |
119 | |
120 | /// LifoSemRawNode is the actual pooled storage that backs LifoSemNode |
121 | /// for user-specified Handoff types. This is done so that we can have |
122 | /// a large static IndexedMemPool of nodes, instead of per-type pools |
123 | template <template <typename> class Atom> |
124 | struct LifoSemRawNode { |
125 | aligned_storage_for_t<void*> raw; |
126 | |
127 | /// The IndexedMemPool index of the next node in this chain, or 0 |
128 | /// if none. This will be set to uint32_t(-1) if the node is being |
129 | /// posted due to a shutdown-induced wakeup |
130 | uint32_t next; |
131 | |
132 | bool isShutdownNotice() const { |
133 | return next == uint32_t(-1); |
134 | } |
135 | void clearShutdownNotice() { |
136 | next = 0; |
137 | } |
138 | void setShutdownNotice() { |
139 | next = uint32_t(-1); |
140 | } |
141 | |
142 | typedef folly::IndexedMemPool<LifoSemRawNode<Atom>, 32, 200, Atom> Pool; |
143 | |
144 | /// Storage for all of the waiter nodes for LifoSem-s that use Atom |
145 | static Pool& pool(); |
146 | }; |
147 | |
148 | /// Use this macro to declare the static storage that backs the raw nodes |
149 | /// for the specified atomic type |
150 | #define LIFOSEM_DECLARE_POOL(Atom, capacity) \ |
151 | namespace folly { \ |
152 | namespace detail { \ |
153 | template <> \ |
154 | LifoSemRawNode<Atom>::Pool& LifoSemRawNode<Atom>::pool() { \ |
155 | static Pool* instance = new Pool((capacity)); \ |
156 | return *instance; \ |
157 | } \ |
158 | } \ |
159 | } |
160 | |
161 | /// Handoff is a type not bigger than a void* that knows how to perform a |
162 | /// single post() -> wait() communication. It must have a post() method. |
163 | /// If it has a wait() method then LifoSemBase's wait() implementation |
164 | /// will work out of the box, otherwise you will need to specialize |
165 | /// LifoSemBase::wait accordingly. |
166 | template <typename Handoff, template <typename> class Atom> |
167 | struct LifoSemNode : public LifoSemRawNode<Atom> { |
168 | static_assert( |
169 | sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw), |
170 | "Handoff too big for small-object optimization, use indirection" ); |
171 | static_assert( |
172 | alignof(Handoff) <= alignof(decltype(LifoSemRawNode<Atom>::raw)), |
173 | "Handoff alignment constraint not satisfied" ); |
174 | |
175 | template <typename... Args> |
176 | void init(Args&&... args) { |
177 | new (&this->raw) Handoff(std::forward<Args>(args)...); |
178 | } |
179 | |
180 | void destroy() { |
181 | handoff().~Handoff(); |
182 | #ifndef NDEBUG |
183 | memset(&this->raw, 'F', sizeof(this->raw)); |
184 | #endif |
185 | } |
186 | |
187 | Handoff& handoff() { |
188 | return *static_cast<Handoff*>(static_cast<void*>(&this->raw)); |
189 | } |
190 | |
191 | const Handoff& handoff() const { |
192 | return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw)); |
193 | } |
194 | }; |
195 | |
196 | template <typename Handoff, template <typename> class Atom> |
197 | struct LifoSemNodeRecycler { |
198 | void operator()(LifoSemNode<Handoff, Atom>* elem) const { |
199 | elem->destroy(); |
200 | auto idx = LifoSemRawNode<Atom>::pool().locateElem(elem); |
201 | LifoSemRawNode<Atom>::pool().recycleIndex(idx); |
202 | } |
203 | }; |
204 | |
205 | /// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state |
206 | /// bits, and a sequence number used to avoid ABA problems in the lock-free |
207 | /// management of the LifoSem's wait lists. The value can either hold |
208 | /// an integral semaphore value (if there are no waiters) or a node index |
209 | /// (see IndexedMemPool) for the head of a list of wait nodes |
210 | class LifoSemHead { |
211 | // What we really want are bitfields: |
212 | // uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31; |
213 | // Unfortunately g++ generates pretty bad code for this sometimes (I saw |
214 | // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of |
215 | // in bulk, for example). We can generate better code anyway by assuming |
216 | // that setters won't be given values that cause under/overflow, and |
217 | // putting the sequence at the end where its planned overflow doesn't |
218 | // need any masking. |
219 | // |
220 | // data == 0 (empty list) with isNodeIdx is conceptually the same |
221 | // as data == 0 (no unclaimed increments) with !isNodeIdx, we always |
222 | // convert the former into the latter to make the logic simpler. |
223 | enum { |
224 | IsNodeIdxShift = 32, |
225 | IsShutdownShift = 33, |
226 | IsLockedShift = 34, |
227 | SeqShift = 35, |
228 | }; |
229 | enum : uint64_t { |
230 | IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift, |
231 | IsShutdownMask = uint64_t(1) << IsShutdownShift, |
232 | IsLockedMask = uint64_t(1) << IsLockedShift, |
233 | SeqIncr = uint64_t(1) << SeqShift, |
234 | SeqMask = ~(SeqIncr - 1), |
235 | }; |
236 | |
237 | public: |
238 | uint64_t bits; |
239 | |
240 | //////// getters |
241 | |
242 | inline uint32_t idx() const { |
243 | assert(isNodeIdx()); |
244 | assert(uint32_t(bits) != 0); |
245 | return uint32_t(bits); |
246 | } |
247 | inline uint32_t value() const { |
248 | assert(!isNodeIdx()); |
249 | return uint32_t(bits); |
250 | } |
251 | inline constexpr bool isNodeIdx() const { |
252 | return (bits & IsNodeIdxMask) != 0; |
253 | } |
254 | inline constexpr bool isShutdown() const { |
255 | return (bits & IsShutdownMask) != 0; |
256 | } |
257 | inline constexpr bool isLocked() const { |
258 | return (bits & IsLockedMask) != 0; |
259 | } |
260 | inline constexpr uint32_t seq() const { |
261 | return uint32_t(bits >> SeqShift); |
262 | } |
263 | |
264 | //////// setter-like things return a new struct |
265 | |
266 | /// This should only be used for initial construction, not for setting |
267 | /// the value, because it clears the sequence number |
268 | static inline constexpr LifoSemHead fresh(uint32_t value) { |
269 | return LifoSemHead{value}; |
270 | } |
271 | |
272 | /// Returns the LifoSemHead that results from popping a waiter node, |
273 | /// given the current waiter node's next ptr |
274 | inline LifoSemHead withPop(uint32_t idxNext) const { |
275 | assert(!isLocked()); |
276 | assert(isNodeIdx()); |
277 | if (idxNext == 0) { |
278 | // no isNodeIdx bit or data bits. Wraparound of seq bits is okay |
279 | return LifoSemHead{(bits & (SeqMask | IsShutdownMask)) + SeqIncr}; |
280 | } else { |
281 | // preserve sequence bits (incremented with wraparound okay) and |
282 | // isNodeIdx bit, replace all data bits |
283 | return LifoSemHead{(bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) + |
284 | SeqIncr + idxNext}; |
285 | } |
286 | } |
287 | |
288 | /// Returns the LifoSemHead that results from pushing a new waiter node |
289 | inline LifoSemHead withPush(uint32_t _idx) const { |
290 | assert(!isLocked()); |
291 | assert(isNodeIdx() || value() == 0); |
292 | assert(!isShutdown()); |
293 | assert(_idx != 0); |
294 | return LifoSemHead{(bits & SeqMask) | IsNodeIdxMask | _idx}; |
295 | } |
296 | |
297 | /// Returns the LifoSemHead with value increased by delta, with |
298 | /// saturation if the maximum value is reached |
299 | inline LifoSemHead withValueIncr(uint32_t delta) const { |
300 | assert(!isLocked()); |
301 | assert(!isNodeIdx()); |
302 | auto rv = LifoSemHead{bits + SeqIncr + delta}; |
303 | if (UNLIKELY(rv.isNodeIdx())) { |
304 | // value has overflowed into the isNodeIdx bit |
305 | rv = LifoSemHead{(rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1)}; |
306 | } |
307 | return rv; |
308 | } |
309 | |
310 | /// Returns the LifoSemHead that results from decrementing the value |
311 | inline LifoSemHead withValueDecr(uint32_t delta) const { |
312 | assert(!isLocked()); |
313 | assert(delta > 0 && delta <= value()); |
314 | return LifoSemHead{bits + SeqIncr - delta}; |
315 | } |
316 | |
317 | /// Returns the LifoSemHead with the same state as the current node, |
318 | /// but with the shutdown bit set |
319 | inline LifoSemHead withShutdown() const { |
320 | return LifoSemHead{bits | IsShutdownMask}; |
321 | } |
322 | |
323 | // Returns LifoSemHead with lock bit set, but rest of bits unchanged. |
324 | inline LifoSemHead withLock() const { |
325 | assert(!isLocked()); |
326 | return LifoSemHead{bits | IsLockedMask}; |
327 | } |
328 | |
329 | // Returns LifoSemHead with lock bit unset, and updated seqno based |
330 | // on idx. |
331 | inline LifoSemHead withoutLock(uint32_t idxNext) const { |
332 | assert(isLocked()); |
333 | // We need to treat this as a pop, as we may change the list head. |
334 | return LifoSemHead{bits & ~IsLockedMask}.withPop(idxNext); |
335 | } |
336 | |
337 | inline constexpr bool operator==(const LifoSemHead& rhs) const { |
338 | return bits == rhs.bits; |
339 | } |
340 | inline constexpr bool operator!=(const LifoSemHead& rhs) const { |
341 | return !(*this == rhs); |
342 | } |
343 | }; |
344 | |
345 | /// LifoSemBase is the engine for several different types of LIFO |
346 | /// semaphore. LifoSemBase handles storage of positive semaphore values |
347 | /// and wait nodes, but the actual waiting and notification mechanism is |
348 | /// up to the client. |
349 | /// |
350 | /// The Handoff type is responsible for arranging one wakeup notification. |
351 | /// See LifoSemNode for more information on how to make your own. |
352 | template <typename Handoff, template <typename> class Atom = std::atomic> |
353 | struct LifoSemBase { |
354 | /// Constructor |
355 | constexpr explicit LifoSemBase(uint32_t initialValue = 0) |
356 | : head_(LifoSemHead::fresh(initialValue)) {} |
357 | |
358 | LifoSemBase(LifoSemBase const&) = delete; |
359 | LifoSemBase& operator=(LifoSemBase const&) = delete; |
360 | |
361 | /// Silently saturates if value is already 2^32-1 |
362 | bool post() { |
363 | auto idx = incrOrPop(1); |
364 | if (idx != 0) { |
365 | idxToNode(idx).handoff().post(); |
366 | return true; |
367 | } |
368 | return false; |
369 | } |
370 | |
371 | /// Equivalent to n calls to post(), except may be much more efficient. |
372 | /// At any point in time at which the semaphore's value would exceed |
373 | /// 2^32-1 if tracked with infinite precision, it may be silently |
374 | /// truncated to 2^32-1. This saturation is not guaranteed to be exact, |
375 | /// although it is guaranteed that overflow won't result in wrap-around. |
376 | /// There would be a substantial performance and complexity cost in |
377 | /// guaranteeing exact saturation (similar to the cost of maintaining |
378 | /// linearizability near the zero value, but without as much of |
379 | /// a benefit). |
380 | void post(uint32_t n) { |
381 | uint32_t idx; |
382 | while (n > 0 && (idx = incrOrPop(n)) != 0) { |
383 | // pop accounts for only 1 |
384 | idxToNode(idx).handoff().post(); |
385 | --n; |
386 | } |
387 | } |
388 | |
389 | /// Returns true iff shutdown() has been called |
390 | bool isShutdown() const { |
391 | return UNLIKELY(head_->load(std::memory_order_acquire).isShutdown()); |
392 | } |
393 | |
394 | /// Prevents blocking on this semaphore, causing all blocking wait() |
395 | /// calls to throw ShutdownSemError. Both currently blocked wait() and |
396 | /// future calls to wait() for which tryWait() would return false will |
397 | /// cause an exception. Calls to wait() for which the matching post() |
398 | /// has already occurred will proceed normally. |
399 | void shutdown() { |
400 | // first set the shutdown bit |
401 | auto h = head_->load(std::memory_order_acquire); |
402 | while (!h.isShutdown()) { |
403 | if (h.isLocked()) { |
404 | std::this_thread::yield(); |
405 | h = head_->load(std::memory_order_acquire); |
406 | continue; |
407 | } |
408 | |
409 | if (head_->compare_exchange_strong(h, h.withShutdown())) { |
410 | // success |
411 | h = h.withShutdown(); |
412 | break; |
413 | } |
414 | // compare_exchange_strong rereads h, retry |
415 | } |
416 | |
417 | // now wake up any waiters |
418 | while (h.isNodeIdx()) { |
419 | if (h.isLocked()) { |
420 | std::this_thread::yield(); |
421 | h = head_->load(std::memory_order_acquire); |
422 | continue; |
423 | } |
424 | auto& node = idxToNode(h.idx()); |
425 | auto repl = h.withPop(node.next); |
426 | if (head_->compare_exchange_strong(h, repl)) { |
427 | // successful pop, wake up the waiter and move on. The next |
428 | // field is used to convey that this wakeup didn't consume a value |
429 | node.setShutdownNotice(); |
430 | node.handoff().post(); |
431 | h = repl; |
432 | } |
433 | } |
434 | } |
435 | |
436 | /// Returns true iff value was decremented |
437 | bool tryWait() { |
438 | uint32_t n = 1; |
439 | auto rv = decrOrPush(n, 0); |
440 | assert( |
441 | (rv == WaitResult::DECR && n == 0) || |
442 | (rv != WaitResult::DECR && n == 1)); |
443 | // SHUTDOWN is okay here, since we don't actually wait |
444 | return rv == WaitResult::DECR; |
445 | } |
446 | |
447 | /// Equivalent to (but may be much more efficient than) n calls to |
448 | /// tryWait(). Returns the total amount by which the semaphore's value |
449 | /// was decreased |
450 | uint32_t tryWait(uint32_t n) { |
451 | auto const orig = n; |
452 | while (n > 0) { |
453 | #ifndef NDEBUG |
454 | auto prev = n; |
455 | #endif |
456 | auto rv = decrOrPush(n, 0); |
457 | assert( |
458 | (rv == WaitResult::DECR && n < prev) || |
459 | (rv != WaitResult::DECR && n == prev)); |
460 | if (rv != WaitResult::DECR) { |
461 | break; |
462 | } |
463 | } |
464 | return orig - n; |
465 | } |
466 | |
467 | /// Blocks the current thread until there is a matching post or the |
468 | /// semaphore is shut down. Throws ShutdownSemError if the semaphore |
469 | /// has been shut down and this method would otherwise be blocking. |
470 | /// Note that wait() doesn't throw during shutdown if tryWait() would |
471 | /// return true |
472 | void wait() { |
473 | auto const deadline = std::chrono::steady_clock::time_point::max(); |
474 | auto res = try_wait_until(deadline); |
475 | FOLLY_SAFE_DCHECK(res, "infinity time has passed" ); |
476 | } |
477 | |
478 | template <typename Rep, typename Period> |
479 | bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) { |
480 | return try_wait_until(timeout + std::chrono::steady_clock::now()); |
481 | } |
482 | |
483 | template <typename Clock, typename Duration> |
484 | bool try_wait_until( |
485 | const std::chrono::time_point<Clock, Duration>& deadline) { |
486 | // early check isn't required for correctness, but is an important |
487 | // perf win if we can avoid allocating and deallocating a node |
488 | if (tryWait()) { |
489 | return true; |
490 | } |
491 | |
492 | // allocateNode() won't compile unless Handoff has a default |
493 | // constructor |
494 | UniquePtr node = allocateNode(); |
495 | |
496 | auto rv = tryWaitOrPush(*node); |
497 | if (UNLIKELY(rv == WaitResult::SHUTDOWN)) { |
498 | assert(isShutdown()); |
499 | throw ShutdownSemError("wait() would block but semaphore is shut down" ); |
500 | } |
501 | |
502 | if (rv == WaitResult::PUSH) { |
503 | if (!node->handoff().try_wait_until(deadline)) { |
504 | if (tryRemoveNode(*node)) { |
505 | return false; |
506 | } else { |
507 | // We could not remove our node. Return to waiting. |
508 | // |
509 | // This only happens if we lose a removal race with post(), |
510 | // so we are not likely to wait long. This is only |
511 | // necessary to ensure we don't return node's memory back to |
512 | // IndexedMemPool before post() has had a chance to post to |
513 | // handoff(). In a stronger memory reclamation scheme, such |
514 | // as hazptr or rcu, this would not be necessary. |
515 | node->handoff().wait(); |
516 | } |
517 | } |
518 | if (UNLIKELY(node->isShutdownNotice())) { |
519 | // this wait() didn't consume a value, it was triggered by shutdown |
520 | throw ShutdownSemError( |
521 | "blocking wait() interrupted by semaphore shutdown" ); |
522 | } |
523 | |
524 | // node->handoff().wait() can't return until after the node has |
525 | // been popped and post()ed, so it is okay for the UniquePtr to |
526 | // recycle the node now |
527 | } |
528 | // else node wasn't pushed, so it is safe to recycle |
529 | return true; |
530 | } |
531 | |
532 | /// Returns a guess at the current value, designed for debugging. |
533 | /// If there are no concurrent posters or waiters then this will |
534 | /// be correct |
535 | uint32_t valueGuess() const { |
536 | // this is actually linearizable, but we don't promise that because |
537 | // we may want to add striping in the future to help under heavy |
538 | // contention |
539 | auto h = head_->load(std::memory_order_acquire); |
540 | return h.isNodeIdx() ? 0 : h.value(); |
541 | } |
542 | |
543 | protected: |
544 | enum class WaitResult { |
545 | PUSH, |
546 | DECR, |
547 | SHUTDOWN, |
548 | }; |
549 | |
550 | /// The type of a std::unique_ptr that will automatically return a |
551 | /// LifoSemNode to the appropriate IndexedMemPool |
552 | typedef std:: |
553 | unique_ptr<LifoSemNode<Handoff, Atom>, LifoSemNodeRecycler<Handoff, Atom>> |
554 | UniquePtr; |
555 | |
556 | /// Returns a node that can be passed to decrOrLink |
557 | template <typename... Args> |
558 | UniquePtr allocateNode(Args&&... args) { |
559 | auto idx = LifoSemRawNode<Atom>::pool().allocIndex(); |
560 | if (idx != 0) { |
561 | auto& node = idxToNode(idx); |
562 | node.clearShutdownNotice(); |
563 | try { |
564 | node.init(std::forward<Args>(args)...); |
565 | } catch (...) { |
566 | LifoSemRawNode<Atom>::pool().recycleIndex(idx); |
567 | throw; |
568 | } |
569 | return UniquePtr(&node); |
570 | } else { |
571 | return UniquePtr(); |
572 | } |
573 | } |
574 | |
575 | /// Returns DECR if the semaphore value was decremented (and waiterNode |
576 | /// was untouched), PUSH if a reference to the wait node was pushed, |
577 | /// or SHUTDOWN if decrement was not possible and push wasn't allowed |
578 | /// because isShutdown(). Ownership of the wait node remains the |
579 | /// responsibility of the caller, who must not release it until after |
580 | /// the node's Handoff has been posted. |
581 | WaitResult tryWaitOrPush(LifoSemNode<Handoff, Atom>& waiterNode) { |
582 | uint32_t n = 1; |
583 | return decrOrPush(n, nodeToIdx(waiterNode)); |
584 | } |
585 | |
586 | private: |
587 | CachelinePadded<folly::AtomicStruct<LifoSemHead, Atom>> head_; |
588 | |
589 | static LifoSemNode<Handoff, Atom>& idxToNode(uint32_t idx) { |
590 | auto raw = &LifoSemRawNode<Atom>::pool()[idx]; |
591 | return *static_cast<LifoSemNode<Handoff, Atom>*>(raw); |
592 | } |
593 | |
594 | static uint32_t nodeToIdx(const LifoSemNode<Handoff, Atom>& node) { |
595 | return LifoSemRawNode<Atom>::pool().locateElem(&node); |
596 | } |
597 | |
598 | // Locks the list head (blocking concurrent pushes and pops) |
599 | // and attempts to remove this node. Returns true if node was |
600 | // found and removed, false if not found. |
601 | bool tryRemoveNode(const LifoSemNode<Handoff, Atom>& removenode) { |
602 | auto removeidx = nodeToIdx(removenode); |
603 | auto head = head_->load(std::memory_order_acquire); |
604 | // Try to lock the head. |
605 | while (true) { |
606 | if (head.isLocked()) { |
607 | std::this_thread::yield(); |
608 | head = head_->load(std::memory_order_acquire); |
609 | continue; |
610 | } |
611 | if (!head.isNodeIdx()) { |
612 | return false; |
613 | } |
614 | if (head_->compare_exchange_weak( |
615 | head, |
616 | head.withLock(), |
617 | std::memory_order_acquire, |
618 | std::memory_order_relaxed)) { |
619 | break; |
620 | } |
621 | } |
622 | // Update local var to what head_ is, for better assert() checking. |
623 | head = head.withLock(); |
624 | bool result = false; |
625 | auto idx = head.idx(); |
626 | if (idx == removeidx) { |
627 | // pop from head. Head seqno is updated. |
628 | head_->store( |
629 | head.withoutLock(removenode.next), std::memory_order_release); |
630 | return true; |
631 | } |
632 | auto node = &idxToNode(idx); |
633 | idx = node->next; |
634 | while (idx) { |
635 | if (idx == removeidx) { |
636 | // Pop from mid-list. |
637 | node->next = removenode.next; |
638 | result = true; |
639 | break; |
640 | } |
641 | node = &idxToNode(idx); |
642 | idx = node->next; |
643 | } |
644 | // Unlock and return result |
645 | head_->store(head.withoutLock(head.idx()), std::memory_order_release); |
646 | return result; |
647 | } |
648 | |
649 | /// Either increments by n and returns 0, or pops a node and returns it. |
650 | /// If n + the stripe's value overflows, then the stripe's value |
651 | /// saturates silently at 2^32-1 |
652 | uint32_t incrOrPop(uint32_t n) { |
653 | while (true) { |
654 | assert(n > 0); |
655 | |
656 | auto head = head_->load(std::memory_order_acquire); |
657 | if (head.isLocked()) { |
658 | std::this_thread::yield(); |
659 | continue; |
660 | } |
661 | if (head.isNodeIdx()) { |
662 | auto& node = idxToNode(head.idx()); |
663 | if (head_->compare_exchange_strong(head, head.withPop(node.next))) { |
664 | // successful pop |
665 | return head.idx(); |
666 | } |
667 | } else { |
668 | auto after = head.withValueIncr(n); |
669 | if (head_->compare_exchange_strong(head, after)) { |
670 | // successful incr |
671 | return 0; |
672 | } |
673 | } |
674 | // retry |
675 | } |
676 | } |
677 | |
678 | /// Returns DECR if some amount was decremented, with that amount |
679 | /// subtracted from n. If n is 1 and this function returns DECR then n |
680 | /// must be 0 afterward. Returns PUSH if no value could be decremented |
681 | /// and idx was pushed, or if idx was zero and no push was performed but |
682 | /// a push would have been performed with a valid node. Returns SHUTDOWN |
683 | /// if the caller should have blocked but isShutdown(). If idx == 0, |
684 | /// may return PUSH even after isShutdown() or may return SHUTDOWN |
685 | WaitResult decrOrPush(uint32_t& n, uint32_t idx) { |
686 | assert(n > 0); |
687 | |
688 | while (true) { |
689 | auto head = head_->load(std::memory_order_acquire); |
690 | |
691 | if (head.isLocked()) { |
692 | std::this_thread::yield(); |
693 | continue; |
694 | } |
695 | |
696 | if (!head.isNodeIdx() && head.value() > 0) { |
697 | // decr |
698 | auto delta = std::min(n, head.value()); |
699 | if (head_->compare_exchange_strong(head, head.withValueDecr(delta))) { |
700 | n -= delta; |
701 | return WaitResult::DECR; |
702 | } |
703 | } else { |
704 | // push |
705 | if (idx == 0) { |
706 | return WaitResult::PUSH; |
707 | } |
708 | |
709 | if (UNLIKELY(head.isShutdown())) { |
710 | return WaitResult::SHUTDOWN; |
711 | } |
712 | |
713 | auto& node = idxToNode(idx); |
714 | node.next = head.isNodeIdx() ? head.idx() : 0; |
715 | if (head_->compare_exchange_strong(head, head.withPush(idx))) { |
716 | // push succeeded |
717 | return WaitResult::PUSH; |
718 | } |
719 | } |
720 | } |
721 | // retry |
722 | } |
723 | }; |
724 | |
725 | } // namespace detail |
726 | |
727 | template <template <typename> class Atom, class BatonType> |
728 | struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> { |
729 | constexpr explicit LifoSemImpl(uint32_t v = 0) |
730 | : detail::LifoSemBase<BatonType, Atom>(v) {} |
731 | }; |
732 | |
733 | } // namespace folly |
734 | |