1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#pragma once
17
18#include <algorithm>
19#include <atomic>
20#include <cstdint>
21#include <cstring>
22#include <memory>
23#include <system_error>
24
25#include <folly/CPortability.h>
26#include <folly/CachelinePadded.h>
27#include <folly/IndexedMemPool.h>
28#include <folly/Likely.h>
29#include <folly/Traits.h>
30#include <folly/lang/SafeAssert.h>
31#include <folly/synchronization/AtomicStruct.h>
32#include <folly/synchronization/SaturatingSemaphore.h>
33
34namespace folly {
35
36template <
37 template <typename> class Atom = std::atomic,
38 class BatonType = SaturatingSemaphore<true, Atom>>
39struct LifoSemImpl;
40
41/// LifoSem is a semaphore that wakes its waiters in a manner intended to
42/// maximize performance rather than fairness. It should be preferred
43/// to a mutex+condvar or POSIX sem_t solution when all of the waiters
44/// are equivalent. It is faster than a condvar or sem_t, and it has a
45/// shutdown state that might save you a lot of complexity when it comes
46/// time to shut down your work pipelines. LifoSem is larger than sem_t,
47/// but that is only because it uses padding and alignment to avoid
48/// false sharing.
49///
50/// LifoSem allows multi-post and multi-tryWait, and provides a shutdown
51/// state that awakens all waiters. LifoSem is faster than sem_t because
52/// it performs exact wakeups, so it often requires fewer system calls.
53/// It provides all of the functionality of sem_t except for timed waiting.
54/// It is called LifoSem because its wakeup policy is approximately LIFO,
55/// rather than the usual FIFO.
56///
57/// The core semaphore operations provided are:
58///
59/// -- post() -- if there is a pending waiter, wake it up, otherwise
60/// increment the value of the semaphore. If the value of the semaphore
61/// is already 2^32-1, does nothing. Compare to sem_post().
62///
63/// -- post(n) -- equivalent to n calls to post(), but much more efficient.
64/// sem_t has no equivalent to this method.
65///
66/// -- bool tryWait() -- if the semaphore's value is positive, decrements it
67/// and returns true, otherwise returns false. Compare to sem_trywait().
68///
69/// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's
70/// value by n, returning the amount by which it actually was decremented
71/// (a value from 0 to n inclusive). Not atomic. Equivalent to n calls
72/// to tryWait(). sem_t has no equivalent to this method.
73///
74/// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait().
75///
76/// -- timed wait variants - will wait until timeout. Note when these
77/// timeout, the current implementation takes a lock, blocking
78/// concurrent pushes and pops. (If timed wait calls are
79/// substantial, consider re-working this code to be lock-free).
80///
81/// LifoSem also has the notion of a shutdown state, in which any calls
82/// that would block (or are already blocked) throw ShutdownSemError.
83/// Note the difference between a call to wait() and a call to wait()
84/// that might block. In the former case tryWait() would succeed, and no
85/// isShutdown() check is performed. In the latter case an exception is
86/// thrown. This behavior allows a LifoSem controlling work distribution
87/// to drain. If you want to immediately stop all waiting on shutdown,
88/// you can just check isShutdown() yourself (preferrably wrapped in
89/// an UNLIKELY). This fast-stop behavior is easy to add, but difficult
90/// to remove if you want the draining behavior, which is why we have
91/// chosen the former.
92///
93/// All LifoSem operations except valueGuess() are guaranteed to be
94/// linearizable.
95typedef LifoSemImpl<> LifoSem;
96
97/// The exception thrown when wait()ing on an isShutdown() LifoSem
98struct FOLLY_EXPORT ShutdownSemError : public std::runtime_error {
99 explicit ShutdownSemError(const std::string& msg);
100 ~ShutdownSemError() noexcept override;
101};
102
103namespace detail {
104
105// Internally, a LifoSem is either a value or a linked list of wait nodes.
106// This union is captured in the LifoSemHead type, which holds either a
107// value or an indexed pointer to the list. LifoSemHead itself is a value
108// type, the head is a mutable atomic box containing a LifoSemHead value.
109// Each wait node corresponds to exactly one waiter. Values can flow
110// through the semaphore either by going into and out of the head's value,
111// or by direct communication from a poster to a waiter. The former path
112// is taken when there are no pending waiters, the latter otherwise. The
113// general flow of a post is to try to increment the value or pop-and-post
114// a wait node. Either of those have the effect of conveying one semaphore
115// unit. Waiting is the opposite, either a decrement of the value or
116// push-and-wait of a wait node. The generic LifoSemBase abstracts the
117// actual mechanism by which a wait node's post->wait communication is
118// performed, which is why we have LifoSemRawNode and LifoSemNode.
119
120/// LifoSemRawNode is the actual pooled storage that backs LifoSemNode
121/// for user-specified Handoff types. This is done so that we can have
122/// a large static IndexedMemPool of nodes, instead of per-type pools
123template <template <typename> class Atom>
124struct LifoSemRawNode {
125 aligned_storage_for_t<void*> raw;
126
127 /// The IndexedMemPool index of the next node in this chain, or 0
128 /// if none. This will be set to uint32_t(-1) if the node is being
129 /// posted due to a shutdown-induced wakeup
130 uint32_t next;
131
132 bool isShutdownNotice() const {
133 return next == uint32_t(-1);
134 }
135 void clearShutdownNotice() {
136 next = 0;
137 }
138 void setShutdownNotice() {
139 next = uint32_t(-1);
140 }
141
142 typedef folly::IndexedMemPool<LifoSemRawNode<Atom>, 32, 200, Atom> Pool;
143
144 /// Storage for all of the waiter nodes for LifoSem-s that use Atom
145 static Pool& pool();
146};
147
148/// Use this macro to declare the static storage that backs the raw nodes
149/// for the specified atomic type
150#define LIFOSEM_DECLARE_POOL(Atom, capacity) \
151 namespace folly { \
152 namespace detail { \
153 template <> \
154 LifoSemRawNode<Atom>::Pool& LifoSemRawNode<Atom>::pool() { \
155 static Pool* instance = new Pool((capacity)); \
156 return *instance; \
157 } \
158 } \
159 }
160
161/// Handoff is a type not bigger than a void* that knows how to perform a
162/// single post() -> wait() communication. It must have a post() method.
163/// If it has a wait() method then LifoSemBase's wait() implementation
164/// will work out of the box, otherwise you will need to specialize
165/// LifoSemBase::wait accordingly.
166template <typename Handoff, template <typename> class Atom>
167struct LifoSemNode : public LifoSemRawNode<Atom> {
168 static_assert(
169 sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw),
170 "Handoff too big for small-object optimization, use indirection");
171 static_assert(
172 alignof(Handoff) <= alignof(decltype(LifoSemRawNode<Atom>::raw)),
173 "Handoff alignment constraint not satisfied");
174
175 template <typename... Args>
176 void init(Args&&... args) {
177 new (&this->raw) Handoff(std::forward<Args>(args)...);
178 }
179
180 void destroy() {
181 handoff().~Handoff();
182#ifndef NDEBUG
183 memset(&this->raw, 'F', sizeof(this->raw));
184#endif
185 }
186
187 Handoff& handoff() {
188 return *static_cast<Handoff*>(static_cast<void*>(&this->raw));
189 }
190
191 const Handoff& handoff() const {
192 return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw));
193 }
194};
195
196template <typename Handoff, template <typename> class Atom>
197struct LifoSemNodeRecycler {
198 void operator()(LifoSemNode<Handoff, Atom>* elem) const {
199 elem->destroy();
200 auto idx = LifoSemRawNode<Atom>::pool().locateElem(elem);
201 LifoSemRawNode<Atom>::pool().recycleIndex(idx);
202 }
203};
204
205/// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state
206/// bits, and a sequence number used to avoid ABA problems in the lock-free
207/// management of the LifoSem's wait lists. The value can either hold
208/// an integral semaphore value (if there are no waiters) or a node index
209/// (see IndexedMemPool) for the head of a list of wait nodes
210class LifoSemHead {
211 // What we really want are bitfields:
212 // uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31;
213 // Unfortunately g++ generates pretty bad code for this sometimes (I saw
214 // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of
215 // in bulk, for example). We can generate better code anyway by assuming
216 // that setters won't be given values that cause under/overflow, and
217 // putting the sequence at the end where its planned overflow doesn't
218 // need any masking.
219 //
220 // data == 0 (empty list) with isNodeIdx is conceptually the same
221 // as data == 0 (no unclaimed increments) with !isNodeIdx, we always
222 // convert the former into the latter to make the logic simpler.
223 enum {
224 IsNodeIdxShift = 32,
225 IsShutdownShift = 33,
226 IsLockedShift = 34,
227 SeqShift = 35,
228 };
229 enum : uint64_t {
230 IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
231 IsShutdownMask = uint64_t(1) << IsShutdownShift,
232 IsLockedMask = uint64_t(1) << IsLockedShift,
233 SeqIncr = uint64_t(1) << SeqShift,
234 SeqMask = ~(SeqIncr - 1),
235 };
236
237 public:
238 uint64_t bits;
239
240 //////// getters
241
242 inline uint32_t idx() const {
243 assert(isNodeIdx());
244 assert(uint32_t(bits) != 0);
245 return uint32_t(bits);
246 }
247 inline uint32_t value() const {
248 assert(!isNodeIdx());
249 return uint32_t(bits);
250 }
251 inline constexpr bool isNodeIdx() const {
252 return (bits & IsNodeIdxMask) != 0;
253 }
254 inline constexpr bool isShutdown() const {
255 return (bits & IsShutdownMask) != 0;
256 }
257 inline constexpr bool isLocked() const {
258 return (bits & IsLockedMask) != 0;
259 }
260 inline constexpr uint32_t seq() const {
261 return uint32_t(bits >> SeqShift);
262 }
263
264 //////// setter-like things return a new struct
265
266 /// This should only be used for initial construction, not for setting
267 /// the value, because it clears the sequence number
268 static inline constexpr LifoSemHead fresh(uint32_t value) {
269 return LifoSemHead{value};
270 }
271
272 /// Returns the LifoSemHead that results from popping a waiter node,
273 /// given the current waiter node's next ptr
274 inline LifoSemHead withPop(uint32_t idxNext) const {
275 assert(!isLocked());
276 assert(isNodeIdx());
277 if (idxNext == 0) {
278 // no isNodeIdx bit or data bits. Wraparound of seq bits is okay
279 return LifoSemHead{(bits & (SeqMask | IsShutdownMask)) + SeqIncr};
280 } else {
281 // preserve sequence bits (incremented with wraparound okay) and
282 // isNodeIdx bit, replace all data bits
283 return LifoSemHead{(bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
284 SeqIncr + idxNext};
285 }
286 }
287
288 /// Returns the LifoSemHead that results from pushing a new waiter node
289 inline LifoSemHead withPush(uint32_t _idx) const {
290 assert(!isLocked());
291 assert(isNodeIdx() || value() == 0);
292 assert(!isShutdown());
293 assert(_idx != 0);
294 return LifoSemHead{(bits & SeqMask) | IsNodeIdxMask | _idx};
295 }
296
297 /// Returns the LifoSemHead with value increased by delta, with
298 /// saturation if the maximum value is reached
299 inline LifoSemHead withValueIncr(uint32_t delta) const {
300 assert(!isLocked());
301 assert(!isNodeIdx());
302 auto rv = LifoSemHead{bits + SeqIncr + delta};
303 if (UNLIKELY(rv.isNodeIdx())) {
304 // value has overflowed into the isNodeIdx bit
305 rv = LifoSemHead{(rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1)};
306 }
307 return rv;
308 }
309
310 /// Returns the LifoSemHead that results from decrementing the value
311 inline LifoSemHead withValueDecr(uint32_t delta) const {
312 assert(!isLocked());
313 assert(delta > 0 && delta <= value());
314 return LifoSemHead{bits + SeqIncr - delta};
315 }
316
317 /// Returns the LifoSemHead with the same state as the current node,
318 /// but with the shutdown bit set
319 inline LifoSemHead withShutdown() const {
320 return LifoSemHead{bits | IsShutdownMask};
321 }
322
323 // Returns LifoSemHead with lock bit set, but rest of bits unchanged.
324 inline LifoSemHead withLock() const {
325 assert(!isLocked());
326 return LifoSemHead{bits | IsLockedMask};
327 }
328
329 // Returns LifoSemHead with lock bit unset, and updated seqno based
330 // on idx.
331 inline LifoSemHead withoutLock(uint32_t idxNext) const {
332 assert(isLocked());
333 // We need to treat this as a pop, as we may change the list head.
334 return LifoSemHead{bits & ~IsLockedMask}.withPop(idxNext);
335 }
336
337 inline constexpr bool operator==(const LifoSemHead& rhs) const {
338 return bits == rhs.bits;
339 }
340 inline constexpr bool operator!=(const LifoSemHead& rhs) const {
341 return !(*this == rhs);
342 }
343};
344
345/// LifoSemBase is the engine for several different types of LIFO
346/// semaphore. LifoSemBase handles storage of positive semaphore values
347/// and wait nodes, but the actual waiting and notification mechanism is
348/// up to the client.
349///
350/// The Handoff type is responsible for arranging one wakeup notification.
351/// See LifoSemNode for more information on how to make your own.
352template <typename Handoff, template <typename> class Atom = std::atomic>
353struct LifoSemBase {
354 /// Constructor
355 constexpr explicit LifoSemBase(uint32_t initialValue = 0)
356 : head_(LifoSemHead::fresh(initialValue)) {}
357
358 LifoSemBase(LifoSemBase const&) = delete;
359 LifoSemBase& operator=(LifoSemBase const&) = delete;
360
361 /// Silently saturates if value is already 2^32-1
362 bool post() {
363 auto idx = incrOrPop(1);
364 if (idx != 0) {
365 idxToNode(idx).handoff().post();
366 return true;
367 }
368 return false;
369 }
370
371 /// Equivalent to n calls to post(), except may be much more efficient.
372 /// At any point in time at which the semaphore's value would exceed
373 /// 2^32-1 if tracked with infinite precision, it may be silently
374 /// truncated to 2^32-1. This saturation is not guaranteed to be exact,
375 /// although it is guaranteed that overflow won't result in wrap-around.
376 /// There would be a substantial performance and complexity cost in
377 /// guaranteeing exact saturation (similar to the cost of maintaining
378 /// linearizability near the zero value, but without as much of
379 /// a benefit).
380 void post(uint32_t n) {
381 uint32_t idx;
382 while (n > 0 && (idx = incrOrPop(n)) != 0) {
383 // pop accounts for only 1
384 idxToNode(idx).handoff().post();
385 --n;
386 }
387 }
388
389 /// Returns true iff shutdown() has been called
390 bool isShutdown() const {
391 return UNLIKELY(head_->load(std::memory_order_acquire).isShutdown());
392 }
393
394 /// Prevents blocking on this semaphore, causing all blocking wait()
395 /// calls to throw ShutdownSemError. Both currently blocked wait() and
396 /// future calls to wait() for which tryWait() would return false will
397 /// cause an exception. Calls to wait() for which the matching post()
398 /// has already occurred will proceed normally.
399 void shutdown() {
400 // first set the shutdown bit
401 auto h = head_->load(std::memory_order_acquire);
402 while (!h.isShutdown()) {
403 if (h.isLocked()) {
404 std::this_thread::yield();
405 h = head_->load(std::memory_order_acquire);
406 continue;
407 }
408
409 if (head_->compare_exchange_strong(h, h.withShutdown())) {
410 // success
411 h = h.withShutdown();
412 break;
413 }
414 // compare_exchange_strong rereads h, retry
415 }
416
417 // now wake up any waiters
418 while (h.isNodeIdx()) {
419 if (h.isLocked()) {
420 std::this_thread::yield();
421 h = head_->load(std::memory_order_acquire);
422 continue;
423 }
424 auto& node = idxToNode(h.idx());
425 auto repl = h.withPop(node.next);
426 if (head_->compare_exchange_strong(h, repl)) {
427 // successful pop, wake up the waiter and move on. The next
428 // field is used to convey that this wakeup didn't consume a value
429 node.setShutdownNotice();
430 node.handoff().post();
431 h = repl;
432 }
433 }
434 }
435
436 /// Returns true iff value was decremented
437 bool tryWait() {
438 uint32_t n = 1;
439 auto rv = decrOrPush(n, 0);
440 assert(
441 (rv == WaitResult::DECR && n == 0) ||
442 (rv != WaitResult::DECR && n == 1));
443 // SHUTDOWN is okay here, since we don't actually wait
444 return rv == WaitResult::DECR;
445 }
446
447 /// Equivalent to (but may be much more efficient than) n calls to
448 /// tryWait(). Returns the total amount by which the semaphore's value
449 /// was decreased
450 uint32_t tryWait(uint32_t n) {
451 auto const orig = n;
452 while (n > 0) {
453#ifndef NDEBUG
454 auto prev = n;
455#endif
456 auto rv = decrOrPush(n, 0);
457 assert(
458 (rv == WaitResult::DECR && n < prev) ||
459 (rv != WaitResult::DECR && n == prev));
460 if (rv != WaitResult::DECR) {
461 break;
462 }
463 }
464 return orig - n;
465 }
466
467 /// Blocks the current thread until there is a matching post or the
468 /// semaphore is shut down. Throws ShutdownSemError if the semaphore
469 /// has been shut down and this method would otherwise be blocking.
470 /// Note that wait() doesn't throw during shutdown if tryWait() would
471 /// return true
472 void wait() {
473 auto const deadline = std::chrono::steady_clock::time_point::max();
474 auto res = try_wait_until(deadline);
475 FOLLY_SAFE_DCHECK(res, "infinity time has passed");
476 }
477
478 template <typename Rep, typename Period>
479 bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) {
480 return try_wait_until(timeout + std::chrono::steady_clock::now());
481 }
482
483 template <typename Clock, typename Duration>
484 bool try_wait_until(
485 const std::chrono::time_point<Clock, Duration>& deadline) {
486 // early check isn't required for correctness, but is an important
487 // perf win if we can avoid allocating and deallocating a node
488 if (tryWait()) {
489 return true;
490 }
491
492 // allocateNode() won't compile unless Handoff has a default
493 // constructor
494 UniquePtr node = allocateNode();
495
496 auto rv = tryWaitOrPush(*node);
497 if (UNLIKELY(rv == WaitResult::SHUTDOWN)) {
498 assert(isShutdown());
499 throw ShutdownSemError("wait() would block but semaphore is shut down");
500 }
501
502 if (rv == WaitResult::PUSH) {
503 if (!node->handoff().try_wait_until(deadline)) {
504 if (tryRemoveNode(*node)) {
505 return false;
506 } else {
507 // We could not remove our node. Return to waiting.
508 //
509 // This only happens if we lose a removal race with post(),
510 // so we are not likely to wait long. This is only
511 // necessary to ensure we don't return node's memory back to
512 // IndexedMemPool before post() has had a chance to post to
513 // handoff(). In a stronger memory reclamation scheme, such
514 // as hazptr or rcu, this would not be necessary.
515 node->handoff().wait();
516 }
517 }
518 if (UNLIKELY(node->isShutdownNotice())) {
519 // this wait() didn't consume a value, it was triggered by shutdown
520 throw ShutdownSemError(
521 "blocking wait() interrupted by semaphore shutdown");
522 }
523
524 // node->handoff().wait() can't return until after the node has
525 // been popped and post()ed, so it is okay for the UniquePtr to
526 // recycle the node now
527 }
528 // else node wasn't pushed, so it is safe to recycle
529 return true;
530 }
531
532 /// Returns a guess at the current value, designed for debugging.
533 /// If there are no concurrent posters or waiters then this will
534 /// be correct
535 uint32_t valueGuess() const {
536 // this is actually linearizable, but we don't promise that because
537 // we may want to add striping in the future to help under heavy
538 // contention
539 auto h = head_->load(std::memory_order_acquire);
540 return h.isNodeIdx() ? 0 : h.value();
541 }
542
543 protected:
544 enum class WaitResult {
545 PUSH,
546 DECR,
547 SHUTDOWN,
548 };
549
550 /// The type of a std::unique_ptr that will automatically return a
551 /// LifoSemNode to the appropriate IndexedMemPool
552 typedef std::
553 unique_ptr<LifoSemNode<Handoff, Atom>, LifoSemNodeRecycler<Handoff, Atom>>
554 UniquePtr;
555
556 /// Returns a node that can be passed to decrOrLink
557 template <typename... Args>
558 UniquePtr allocateNode(Args&&... args) {
559 auto idx = LifoSemRawNode<Atom>::pool().allocIndex();
560 if (idx != 0) {
561 auto& node = idxToNode(idx);
562 node.clearShutdownNotice();
563 try {
564 node.init(std::forward<Args>(args)...);
565 } catch (...) {
566 LifoSemRawNode<Atom>::pool().recycleIndex(idx);
567 throw;
568 }
569 return UniquePtr(&node);
570 } else {
571 return UniquePtr();
572 }
573 }
574
575 /// Returns DECR if the semaphore value was decremented (and waiterNode
576 /// was untouched), PUSH if a reference to the wait node was pushed,
577 /// or SHUTDOWN if decrement was not possible and push wasn't allowed
578 /// because isShutdown(). Ownership of the wait node remains the
579 /// responsibility of the caller, who must not release it until after
580 /// the node's Handoff has been posted.
581 WaitResult tryWaitOrPush(LifoSemNode<Handoff, Atom>& waiterNode) {
582 uint32_t n = 1;
583 return decrOrPush(n, nodeToIdx(waiterNode));
584 }
585
586 private:
587 CachelinePadded<folly::AtomicStruct<LifoSemHead, Atom>> head_;
588
589 static LifoSemNode<Handoff, Atom>& idxToNode(uint32_t idx) {
590 auto raw = &LifoSemRawNode<Atom>::pool()[idx];
591 return *static_cast<LifoSemNode<Handoff, Atom>*>(raw);
592 }
593
594 static uint32_t nodeToIdx(const LifoSemNode<Handoff, Atom>& node) {
595 return LifoSemRawNode<Atom>::pool().locateElem(&node);
596 }
597
598 // Locks the list head (blocking concurrent pushes and pops)
599 // and attempts to remove this node. Returns true if node was
600 // found and removed, false if not found.
601 bool tryRemoveNode(const LifoSemNode<Handoff, Atom>& removenode) {
602 auto removeidx = nodeToIdx(removenode);
603 auto head = head_->load(std::memory_order_acquire);
604 // Try to lock the head.
605 while (true) {
606 if (head.isLocked()) {
607 std::this_thread::yield();
608 head = head_->load(std::memory_order_acquire);
609 continue;
610 }
611 if (!head.isNodeIdx()) {
612 return false;
613 }
614 if (head_->compare_exchange_weak(
615 head,
616 head.withLock(),
617 std::memory_order_acquire,
618 std::memory_order_relaxed)) {
619 break;
620 }
621 }
622 // Update local var to what head_ is, for better assert() checking.
623 head = head.withLock();
624 bool result = false;
625 auto idx = head.idx();
626 if (idx == removeidx) {
627 // pop from head. Head seqno is updated.
628 head_->store(
629 head.withoutLock(removenode.next), std::memory_order_release);
630 return true;
631 }
632 auto node = &idxToNode(idx);
633 idx = node->next;
634 while (idx) {
635 if (idx == removeidx) {
636 // Pop from mid-list.
637 node->next = removenode.next;
638 result = true;
639 break;
640 }
641 node = &idxToNode(idx);
642 idx = node->next;
643 }
644 // Unlock and return result
645 head_->store(head.withoutLock(head.idx()), std::memory_order_release);
646 return result;
647 }
648
649 /// Either increments by n and returns 0, or pops a node and returns it.
650 /// If n + the stripe's value overflows, then the stripe's value
651 /// saturates silently at 2^32-1
652 uint32_t incrOrPop(uint32_t n) {
653 while (true) {
654 assert(n > 0);
655
656 auto head = head_->load(std::memory_order_acquire);
657 if (head.isLocked()) {
658 std::this_thread::yield();
659 continue;
660 }
661 if (head.isNodeIdx()) {
662 auto& node = idxToNode(head.idx());
663 if (head_->compare_exchange_strong(head, head.withPop(node.next))) {
664 // successful pop
665 return head.idx();
666 }
667 } else {
668 auto after = head.withValueIncr(n);
669 if (head_->compare_exchange_strong(head, after)) {
670 // successful incr
671 return 0;
672 }
673 }
674 // retry
675 }
676 }
677
678 /// Returns DECR if some amount was decremented, with that amount
679 /// subtracted from n. If n is 1 and this function returns DECR then n
680 /// must be 0 afterward. Returns PUSH if no value could be decremented
681 /// and idx was pushed, or if idx was zero and no push was performed but
682 /// a push would have been performed with a valid node. Returns SHUTDOWN
683 /// if the caller should have blocked but isShutdown(). If idx == 0,
684 /// may return PUSH even after isShutdown() or may return SHUTDOWN
685 WaitResult decrOrPush(uint32_t& n, uint32_t idx) {
686 assert(n > 0);
687
688 while (true) {
689 auto head = head_->load(std::memory_order_acquire);
690
691 if (head.isLocked()) {
692 std::this_thread::yield();
693 continue;
694 }
695
696 if (!head.isNodeIdx() && head.value() > 0) {
697 // decr
698 auto delta = std::min(n, head.value());
699 if (head_->compare_exchange_strong(head, head.withValueDecr(delta))) {
700 n -= delta;
701 return WaitResult::DECR;
702 }
703 } else {
704 // push
705 if (idx == 0) {
706 return WaitResult::PUSH;
707 }
708
709 if (UNLIKELY(head.isShutdown())) {
710 return WaitResult::SHUTDOWN;
711 }
712
713 auto& node = idxToNode(idx);
714 node.next = head.isNodeIdx() ? head.idx() : 0;
715 if (head_->compare_exchange_strong(head, head.withPush(idx))) {
716 // push succeeded
717 return WaitResult::PUSH;
718 }
719 }
720 }
721 // retry
722 }
723};
724
725} // namespace detail
726
727template <template <typename> class Atom, class BatonType>
728struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> {
729 constexpr explicit LifoSemImpl(uint32_t v = 0)
730 : detail::LifoSemBase<BatonType, Atom>(v) {}
731};
732
733} // namespace folly
734