1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <chrono>
21#include <memory>
22
23#include <glog/logging.h>
24
25#include <folly/ConstexprMath.h>
26#include <folly/Optional.h>
27#include <folly/Traits.h>
28#include <folly/concurrency/CacheLocality.h>
29#include <folly/lang/Align.h>
30#include <folly/synchronization/Hazptr.h>
31#include <folly/synchronization/SaturatingSemaphore.h>
32#include <folly/synchronization/WaitOptions.h>
33#include <folly/synchronization/detail/Spin.h>
34
35namespace folly {
36
37/// UnboundedQueue supports a variety of options for unbounded
38/// dynamically expanding an shrinking queues, including variations of:
39/// - Single vs. multiple producers
40/// - Single vs. multiple consumers
41/// - Blocking vs. spin-waiting
42/// - Non-waiting, timed, and waiting consumer operations.
43/// Producer operations never wait or fail (unless out-of-memory).
44///
45/// Template parameters:
46/// - T: element type
47/// - SingleProducer: true if there can be only one producer at a
48/// time.
49/// - SingleConsumer: true if there can be only one consumer at a
50/// time.
51/// - MayBlock: true if consumers may block, false if they only
52/// spin. A performance tuning parameter.
53/// - LgSegmentSize (default 8): Log base 2 of number of elements per
54/// segment. A performance tuning parameter. See below.
55/// - LgAlign (default 7): Log base 2 of alignment directive; can be
56/// used to balance scalability (avoidance of false sharing) with
57/// memory efficiency.
58///
59/// When to use UnboundedQueue:
60/// - If a small bound may lead to deadlock or performance degradation
61/// under bursty patterns.
62/// - If there is no risk of the queue growing too much.
63///
64/// When not to use UnboundedQueue:
65/// - If there is risk of the queue growing too much and a large bound
66/// is acceptable, then use DynamicBoundedQueue.
67/// - If the queue must not allocate on enqueue or it must have a
68/// small bound, then use fixed-size MPMCQueue or (if non-blocking
69/// SPSC) ProducerConsumerQueue.
70///
71/// Template Aliases:
72/// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
73/// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
74/// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
75/// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
76///
77/// Functions:
78/// Producer operations never wait or fail (unless OOM)
79/// void enqueue(const T&);
80/// void enqueue(T&&);
81/// Adds an element to the end of the queue.
82///
83/// Consumer operations:
84/// void dequeue(T&);
85/// T dequeue();
86/// Extracts an element from the front of the queue. Waits
87/// until an element is available if needed.
88/// bool try_dequeue(T&);
89/// folly::Optional<T> try_dequeue();
90/// Tries to extract an element from the front of the queue
91/// if available.
92/// bool try_dequeue_until(T&, time_point& deadline);
93/// folly::Optional<T> try_dequeue_until(time_point& deadline);
94/// Tries to extract an element from the front of the queue
95/// if available until the specified deadline.
96/// bool try_dequeue_for(T&, duration&);
97/// folly::Optional<T> try_dequeue_for(duration&);
98/// Tries to extract an element from the front of the queue if
99/// available until the expiration of the specified duration.
100/// const T* try_peek();
101/// Returns pointer to the element at the front of the queue
102/// if available, or nullptr if the queue is empty. Only for
103/// SPSC and MPSC.
104///
105/// Secondary functions:
106/// size_t size();
107/// Returns an estimate of the size of the queue.
108/// bool empty();
109/// Returns true only if the queue was empty during the call.
110/// Note: size() and empty() are guaranteed to be accurate only if
111/// the queue is not changed concurrently.
112///
113/// Usage examples:
114/// @code
115/// /* UMPSC, doesn't block, 1024 int elements per segment */
116/// UMPSCQueue<int, false, 10> q;
117/// q.enqueue(1);
118/// q.enqueue(2);
119/// q.enqueue(3);
120/// ASSERT_FALSE(q.empty());
121/// ASSERT_EQ(q.size(), 3);
122/// int v;
123/// q.dequeue(v);
124/// ASSERT_EQ(v, 1);
125/// ASSERT_TRUE(try_dequeue(v));
126/// ASSERT_EQ(v, 2);
127/// ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1)));
128/// ASSERT_EQ(v, 3);
129/// ASSERT_TRUE(q.empty());
130/// ASSERT_EQ(q.size(), 0);
131/// ASSERT_FALSE(try_dequeue(v));
132/// ASSERT_FALSE(try_dequeue_for(v, microseconds(100)));
133/// @endcode
134///
135/// Design:
136/// - The queue is composed of one or more segments. Each segment has
137/// a fixed size of 2^LgSegmentSize entries. Each segment is used
138/// exactly once.
139/// - Each entry is composed of a futex and a single element.
140/// - The queue contains two 64-bit ticket variables. The producer
141/// ticket counts the number of producer tickets issued so far, and
142/// the same for the consumer ticket. Each ticket number corresponds
143/// to a specific entry in a specific segment.
144/// - The queue maintains two pointers, head and tail. Head points to
145/// the segment that corresponds to the current consumer
146/// ticket. Similarly, tail pointer points to the segment that
147/// corresponds to the producer ticket.
148/// - Segments are organized as a singly linked list.
149/// - The producer with the first ticket in the current producer
150/// segment has primary responsibility for allocating and linking
151/// the next segment. Other producers and connsumers may help do so
152/// when needed if that thread is delayed.
153/// - The producer with the last ticket in the current producer
154/// segment is primarily responsible for advancing the tail pointer
155/// to the next segment. Other producers and consumers may help do
156/// so when needed if that thread is delayed.
157/// - Similarly, the consumer with the last ticket in the current
158/// consumer segment is primarily responsible for advancing the head
159/// pointer to the next segment. Other consumers may help do so when
160/// needed if that thread is delayed.
161/// - The tail pointer must not lag behind the head pointer.
162/// Otherwise, the algorithm cannot be certain about the removal of
163/// segment and would have to incur higher costs to ensure safe
164/// reclamation. Consumers must ensure that head never overtakes
165/// tail.
166///
167/// Memory Usage:
168/// - An empty queue contains one segment. A nonempty queue contains
169/// one or two more segment than fits its contents.
170/// - Removed segments are not reclaimed until there are no threads,
171/// producers or consumers, with references to them or their
172/// predecessors. That is, a lagging thread may delay the reclamation
173/// of a chain of removed segments.
174/// - The template parameter LgAlign can be used to reduce memory usage
175/// at the cost of increased chance of false sharing.
176///
177/// Performance considerations:
178/// - All operations take constant time, excluding the costs of
179/// allocation, reclamation, interference from other threads, and
180/// waiting for actions by other threads.
181/// - In general, using the single producer and or single consumer
182/// variants yield better performance than the MP and MC
183/// alternatives.
184/// - SPSC without blocking is the fastest configuration. It doesn't
185/// include any read-modify-write atomic operations, full fences, or
186/// system calls in the critical path.
187/// - MP adds a fetch_add to the critical path of each producer operation.
188/// - MC adds a fetch_add or compare_exchange to the critical path of
189/// each consumer operation.
190/// - The possibility of consumers blocking, even if they never do,
191/// adds a compare_exchange to the critical path of each producer
192/// operation.
193/// - MPMC, SPMC, MPSC require the use of a deferred reclamation
194/// mechanism to guarantee that segments removed from the linked
195/// list, i.e., unreachable from the head pointer, are reclaimed
196/// only after they are no longer needed by any lagging producers or
197/// consumers.
198/// - The overheads of segment allocation and reclamation are intended
199/// to be mostly out of the critical path of the queue's throughput.
200/// - If the template parameter LgSegmentSize is changed, it should be
201/// set adequately high to keep the amortized cost of allocation and
202/// reclamation low.
203/// - It is recommended to measure performance with different variants
204/// when applicable, e.g., UMPMC vs UMPSC. Depending on the use
205/// case, sometimes the variant with the higher sequential overhead
206/// may yield better results due to, for example, more favorable
207/// producer-consumer balance or favorable timing for avoiding
208/// costly blocking.
209
210template <
211 typename T,
212 bool SingleProducer,
213 bool SingleConsumer,
214 bool MayBlock,
215 size_t LgSegmentSize = 8,
216 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
217 template <typename> class Atom = std::atomic>
218class UnboundedQueue {
219 using Ticket = uint64_t;
220 class Entry;
221 class Segment;
222
223 static constexpr bool SPSC = SingleProducer && SingleConsumer;
224 static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
225 static constexpr size_t SegmentSize = 1u << LgSegmentSize;
226 static constexpr size_t Align = 1u << LgAlign;
227
228 static_assert(
229 std::is_nothrow_destructible<T>::value,
230 "T must be nothrow_destructible");
231 static_assert((Stride & 1) == 1, "Stride must be odd");
232 static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
233 static_assert(LgAlign < 16, "LgAlign must be < 16");
234
235 using Sem = folly::SaturatingSemaphore<MayBlock, Atom>;
236
237 struct Consumer {
238 Atom<Segment*> head;
239 Atom<Ticket> ticket;
240 hazptr_obj_batch<Atom> batch;
241 explicit Consumer(Segment* s) : head(s), ticket(0) {
242 s->set_batch_no_tag(&batch); // defined in hazptr_obj
243 }
244 };
245 struct Producer {
246 Atom<Segment*> tail;
247 Atom<Ticket> ticket;
248 explicit Producer(Segment* s) : tail(s), ticket(0) {}
249 };
250
251 alignas(Align) Consumer c_;
252 alignas(Align) Producer p_;
253
254 public:
255 /** constructor */
256 UnboundedQueue()
257 : c_(new Segment(0)), p_(c_.head.load(std::memory_order_relaxed)) {}
258
259 /** destructor */
260 ~UnboundedQueue() {
261 cleanUpRemainingItems();
262 c_.batch.shutdown_and_reclaim();
263 reclaimRemainingSegments();
264 }
265
266 /** enqueue */
267 FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
268 enqueueImpl(arg);
269 }
270
271 FOLLY_ALWAYS_INLINE void enqueue(T&& arg) {
272 enqueueImpl(std::move(arg));
273 }
274
275 /** dequeue */
276 FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
277 dequeueImpl(item);
278 }
279
280 FOLLY_ALWAYS_INLINE T dequeue() noexcept {
281 T item;
282 dequeueImpl(item);
283 return item;
284 }
285
286 /** try_dequeue */
287 FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
288 auto o = try_dequeue();
289 if (LIKELY(o.has_value())) {
290 item = std::move(*o);
291 return true;
292 }
293 return false;
294 }
295
296 FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue() noexcept {
297 return tryDequeueUntil(std::chrono::steady_clock::time_point::min());
298 }
299
300 /** try_dequeue_until */
301 template <typename Clock, typename Duration>
302 FOLLY_ALWAYS_INLINE bool try_dequeue_until(
303 T& item,
304 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
305 folly::Optional<T> o = try_dequeue_until(deadline);
306
307 if (LIKELY(o.has_value())) {
308 item = std::move(*o);
309 return true;
310 }
311
312 return false;
313 }
314
315 template <typename Clock, typename Duration>
316 FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_until(
317 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
318 return tryDequeueUntil(deadline);
319 }
320
321 /** try_dequeue_for */
322 template <typename Rep, typename Period>
323 FOLLY_ALWAYS_INLINE bool try_dequeue_for(
324 T& item,
325 const std::chrono::duration<Rep, Period>& duration) noexcept {
326 folly::Optional<T> o = try_dequeue_for(duration);
327
328 if (LIKELY(o.has_value())) {
329 item = std::move(*o);
330 return true;
331 }
332
333 return false;
334 }
335
336 template <typename Rep, typename Period>
337 FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_for(
338 const std::chrono::duration<Rep, Period>& duration) noexcept {
339 folly::Optional<T> o = try_dequeue();
340 if (LIKELY(o.has_value())) {
341 return o;
342 }
343 return tryDequeueUntil(std::chrono::steady_clock::now() + duration);
344 }
345
346 /** try_peek */
347 FOLLY_ALWAYS_INLINE const T* try_peek() noexcept {
348 /* This function is supported only for USPSC and UMPSC queues. */
349 DCHECK(SingleConsumer);
350 return tryPeekUntil(std::chrono::steady_clock::time_point::min());
351 }
352
353 /** size */
354 size_t size() const noexcept {
355 auto p = producerTicket();
356 auto c = consumerTicket();
357 return p > c ? p - c : 0;
358 }
359
360 /** empty */
361 bool empty() const noexcept {
362 auto c = consumerTicket();
363 auto p = producerTicket();
364 return p <= c;
365 }
366
367 private:
368 /** enqueueImpl */
369 template <typename Arg>
370 FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
371 if (SPSC) {
372 Segment* s = tail();
373 enqueueCommon(s, std::forward<Arg>(arg));
374 } else {
375 // Using hazptr_holder instead of hazptr_local because it is
376 // possible that the T ctor happens to use hazard pointers.
377 hazptr_holder<Atom> hptr;
378 Segment* s = hptr.get_protected(p_.tail);
379 enqueueCommon(s, std::forward<Arg>(arg));
380 }
381 }
382
383 /** enqueueCommon */
384 template <typename Arg>
385 FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
386 Ticket t = fetchIncrementProducerTicket();
387 if (!SingleProducer) {
388 s = findSegment(s, t);
389 }
390 DCHECK_GE(t, s->minTicket());
391 DCHECK_LT(t, s->minTicket() + SegmentSize);
392 size_t idx = index(t);
393 Entry& e = s->entry(idx);
394 e.putItem(std::forward<Arg>(arg));
395 if (responsibleForAlloc(t)) {
396 allocNextSegment(s);
397 }
398 if (responsibleForAdvance(t)) {
399 advanceTail(s);
400 }
401 }
402
403 /** dequeueImpl */
404 FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept {
405 if (SPSC) {
406 Segment* s = head();
407 dequeueCommon(s, item);
408 } else {
409 // Using hazptr_holder instead of hazptr_local because it is
410 // possible to call the T dtor and it may happen to use hazard
411 // pointers.
412 hazptr_holder<Atom> hptr;
413 Segment* s = hptr.get_protected(c_.head);
414 dequeueCommon(s, item);
415 }
416 }
417
418 /** dequeueCommon */
419 FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept {
420 Ticket t = fetchIncrementConsumerTicket();
421 if (!SingleConsumer) {
422 s = findSegment(s, t);
423 }
424 size_t idx = index(t);
425 Entry& e = s->entry(idx);
426 e.takeItem(item);
427 if (responsibleForAdvance(t)) {
428 advanceHead(s);
429 }
430 }
431
432 /** tryDequeueUntil */
433 template <typename Clock, typename Duration>
434 FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntil(
435 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
436 if (SingleConsumer) {
437 Segment* s = head();
438 return tryDequeueUntilSC(s, deadline);
439 } else {
440 // Using hazptr_holder instead of hazptr_local because it is
441 // possible to call ~T() and it may happen to use hazard pointers.
442 hazptr_holder<Atom> hptr;
443 Segment* s = hptr.get_protected(c_.head);
444 return tryDequeueUntilMC(s, deadline);
445 }
446 }
447
448 /** tryDequeueUntilSC */
449 template <typename Clock, typename Duration>
450 FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilSC(
451 Segment* s,
452 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
453 Ticket t = consumerTicket();
454 DCHECK_GE(t, s->minTicket());
455 DCHECK_LT(t, (s->minTicket() + SegmentSize));
456 size_t idx = index(t);
457 Entry& e = s->entry(idx);
458 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
459 return folly::Optional<T>();
460 }
461 setConsumerTicket(t + 1);
462 auto ret = e.takeItem();
463 if (responsibleForAdvance(t)) {
464 advanceHead(s);
465 }
466 return ret;
467 }
468
469 /** tryDequeueUntilMC */
470 template <typename Clock, typename Duration>
471 FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilMC(
472 Segment* s,
473 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
474 while (true) {
475 Ticket t = consumerTicket();
476 if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
477 s = getAllocNextSegment(s, t);
478 DCHECK(s);
479 continue;
480 }
481 size_t idx = index(t);
482 Entry& e = s->entry(idx);
483 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
484 return folly::Optional<T>();
485 }
486 if (!c_.ticket.compare_exchange_weak(
487 t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
488 continue;
489 }
490 auto ret = e.takeItem();
491 if (responsibleForAdvance(t)) {
492 advanceHead(s);
493 }
494 return ret;
495 }
496 }
497
498 /** tryDequeueWaitElem */
499 template <typename Clock, typename Duration>
500 FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
501 Entry& e,
502 Ticket t,
503 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
504 if (LIKELY(e.tryWaitUntil(deadline))) {
505 return true;
506 }
507 return t < producerTicket();
508 }
509
510 /** tryPeekUntil */
511 template <typename Clock, typename Duration>
512 FOLLY_ALWAYS_INLINE const T* tryPeekUntil(
513 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
514 Segment* s = head();
515 Ticket t = consumerTicket();
516 DCHECK_GE(t, s->minTicket());
517 DCHECK_LT(t, (s->minTicket() + SegmentSize));
518 size_t idx = index(t);
519 Entry& e = s->entry(idx);
520 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
521 return nullptr;
522 }
523 return e.peekItem();
524 }
525
526 /** findSegment */
527 FOLLY_ALWAYS_INLINE
528 Segment* findSegment(Segment* s, const Ticket t) noexcept {
529 while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
530 s = getAllocNextSegment(s, t);
531 DCHECK(s);
532 }
533 return s;
534 }
535
536 /** getAllocNextSegment */
537 Segment* getAllocNextSegment(Segment* s, Ticket t) noexcept {
538 Segment* next = s->nextSegment();
539 if (!next) {
540 DCHECK_GE(t, s->minTicket() + SegmentSize);
541 auto diff = t - (s->minTicket() + SegmentSize);
542 if (diff > 0) {
543 auto dur = std::chrono::microseconds(diff);
544 auto deadline = std::chrono::steady_clock::now() + dur;
545 WaitOptions opt;
546 opt.spin_max(dur);
547 detail::spin_pause_until(
548 deadline, opt, [s] { return s->nextSegment(); });
549 next = s->nextSegment();
550 if (next) {
551 return next;
552 }
553 }
554 next = allocNextSegment(s);
555 }
556 DCHECK(next);
557 return next;
558 }
559
560 /** allocNextSegment */
561 Segment* allocNextSegment(Segment* s) {
562 auto t = s->minTicket() + SegmentSize;
563 Segment* next = new Segment(t);
564 next->set_batch_no_tag(&c_.batch); // defined in hazptr_obj
565 next->acquire_ref_safe(); // defined in hazptr_obj_base_linked
566 if (!s->casNextSegment(next)) {
567 delete next;
568 next = s->nextSegment();
569 }
570 DCHECK(next);
571 return next;
572 }
573
574 /** advanceTail */
575 void advanceTail(Segment* s) noexcept {
576 if (SPSC) {
577 Segment* next = s->nextSegment();
578 DCHECK(next);
579 setTail(next);
580 } else {
581 Ticket t = s->minTicket() + SegmentSize;
582 advanceTailToTicket(t);
583 }
584 }
585
586 /** advanceTailToTicket */
587 void advanceTailToTicket(Ticket t) noexcept {
588 Segment* s = tail();
589 while (s->minTicket() < t) {
590 Segment* next = s->nextSegment();
591 if (!next) {
592 next = allocNextSegment(s);
593 }
594 DCHECK(next);
595 casTail(s, next);
596 s = tail();
597 }
598 }
599
600 /** advanceHead */
601 void advanceHead(Segment* s) noexcept {
602 if (SPSC) {
603 while (tail() == s) {
604 /* Wait for producer to advance tail. */
605 asm_volatile_pause();
606 }
607 Segment* next = s->nextSegment();
608 DCHECK(next);
609 setHead(next);
610 reclaimSegment(s);
611 } else {
612 Ticket t = s->minTicket() + SegmentSize;
613 advanceHeadToTicket(t);
614 }
615 }
616
617 /** advanceHeadToTicket */
618 void advanceHeadToTicket(Ticket t) noexcept {
619 /* Tail must not lag behind head. Otherwise, the algorithm cannot
620 be certain about removal of segments. */
621 advanceTailToTicket(t);
622 Segment* s = head();
623 if (SingleConsumer) {
624 DCHECK_EQ(s->minTicket() + SegmentSize, t);
625 Segment* next = s->nextSegment();
626 DCHECK(next);
627 setHead(next);
628 reclaimSegment(s);
629 } else {
630 while (s->minTicket() < t) {
631 Segment* next = s->nextSegment();
632 DCHECK(next);
633 if (casHead(s, next)) {
634 reclaimSegment(s);
635 s = next;
636 }
637 }
638 }
639 }
640
641 /** reclaimSegment */
642 void reclaimSegment(Segment* s) noexcept {
643 if (SPSC) {
644 delete s;
645 } else {
646 s->retire(); // defined in hazptr_obj_base_linked
647 }
648 }
649
650 /** cleanUpRemainingItems */
651 void cleanUpRemainingItems() {
652 auto end = producerTicket();
653 auto s = head();
654 for (auto t = consumerTicket(); t < end; ++t) {
655 if (t >= s->minTicket() + SegmentSize) {
656 s = s->nextSegment();
657 }
658 DCHECK_LT(t, (s->minTicket() + SegmentSize));
659 auto idx = index(t);
660 auto& e = s->entry(idx);
661 e.destroyItem();
662 }
663 }
664
665 /** reclaimRemainingSegments */
666 void reclaimRemainingSegments() {
667 auto h = head();
668 auto s = h->nextSegment();
669 h->setNextSegment(nullptr);
670 reclaimSegment(h);
671 while (s) {
672 auto next = s->nextSegment();
673 delete s;
674 s = next;
675 }
676 }
677
678 FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
679 return (t * Stride) & (SegmentSize - 1);
680 }
681
682 FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept {
683 return (t & (SegmentSize - 1)) == 0;
684 }
685
686 FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept {
687 return (t & (SegmentSize - 1)) == (SegmentSize - 1);
688 }
689
690 FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
691 return c_.head.load(std::memory_order_acquire);
692 }
693
694 FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
695 return p_.tail.load(std::memory_order_acquire);
696 }
697
698 FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
699 return p_.ticket.load(std::memory_order_acquire);
700 }
701
702 FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
703 return c_.ticket.load(std::memory_order_acquire);
704 }
705
706 void setHead(Segment* s) noexcept {
707 DCHECK(SingleConsumer);
708 c_.head.store(s, std::memory_order_relaxed);
709 }
710
711 void setTail(Segment* s) noexcept {
712 DCHECK(SPSC);
713 p_.tail.store(s, std::memory_order_release);
714 }
715
716 bool casHead(Segment*& s, Segment* next) noexcept {
717 DCHECK(!SingleConsumer);
718 return c_.head.compare_exchange_strong(
719 s, next, std::memory_order_release, std::memory_order_acquire);
720 }
721
722 void casTail(Segment*& s, Segment* next) noexcept {
723 DCHECK(!SPSC);
724 p_.tail.compare_exchange_strong(
725 s, next, std::memory_order_release, std::memory_order_relaxed);
726 }
727
728 FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
729 p_.ticket.store(t, std::memory_order_release);
730 }
731
732 FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
733 c_.ticket.store(t, std::memory_order_release);
734 }
735
736 FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
737 if (SingleConsumer) {
738 Ticket oldval = consumerTicket();
739 setConsumerTicket(oldval + 1);
740 return oldval;
741 } else { // MC
742 return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
743 }
744 }
745
746 FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
747 if (SingleProducer) {
748 Ticket oldval = producerTicket();
749 setProducerTicket(oldval + 1);
750 return oldval;
751 } else { // MP
752 return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
753 }
754 }
755
756 /**
757 * Entry
758 */
759 class Entry {
760 Sem flag_;
761 aligned_storage_for_t<T> item_;
762
763 public:
764 template <typename Arg>
765 FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
766 new (&item_) T(std::forward<Arg>(arg));
767 flag_.post();
768 }
769
770 FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
771 flag_.wait();
772 getItem(item);
773 }
774
775 FOLLY_ALWAYS_INLINE folly::Optional<T> takeItem() noexcept {
776 flag_.wait();
777 return getItem();
778 }
779
780 FOLLY_ALWAYS_INLINE const T* peekItem() noexcept {
781 flag_.wait();
782 return itemPtr();
783 }
784
785 template <typename Clock, typename Duration>
786 FOLLY_EXPORT FOLLY_ALWAYS_INLINE bool tryWaitUntil(
787 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
788 // wait-options from benchmarks on contended queues:
789 static constexpr auto const opt =
790 Sem::wait_options().spin_max(std::chrono::microseconds(10));
791 return flag_.try_wait_until(deadline, opt);
792 }
793
794 FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
795 itemPtr()->~T();
796 }
797
798 private:
799 FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
800 item = std::move(*(itemPtr()));
801 destroyItem();
802 }
803
804 FOLLY_ALWAYS_INLINE folly::Optional<T> getItem() noexcept {
805 folly::Optional<T> ret = std::move(*(itemPtr()));
806 destroyItem();
807 return ret;
808 }
809
810 FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
811 return static_cast<T*>(static_cast<void*>(&item_));
812 }
813 }; // Entry
814
815 /**
816 * Segment
817 */
818 class Segment : public hazptr_obj_base_linked<Segment, Atom> {
819 Atom<Segment*> next_{nullptr};
820 const Ticket min_;
821 alignas(Align) Entry b_[SegmentSize];
822
823 public:
824 explicit Segment(const Ticket t) noexcept : min_(t) {}
825
826 Segment* nextSegment() const noexcept {
827 return next_.load(std::memory_order_acquire);
828 }
829
830 void setNextSegment(Segment* next) {
831 next_.store(next, std::memory_order_relaxed);
832 }
833
834 bool casNextSegment(Segment* next) noexcept {
835 Segment* expected = nullptr;
836 return next_.compare_exchange_strong(
837 expected, next, std::memory_order_release, std::memory_order_relaxed);
838 }
839
840 FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
841 DCHECK_EQ((min_ & (SegmentSize - 1)), Ticket(0));
842 return min_;
843 }
844
845 FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
846 return b_[index];
847 }
848
849 template <typename S>
850 void push_links(bool m, S& s) {
851 if (m == false) { // next_ is immutable
852 auto p = nextSegment();
853 if (p) {
854 s.push(p);
855 }
856 }
857 }
858 }; // Segment
859
860}; // UnboundedQueue
861
862/* Aliases */
863
864template <
865 typename T,
866 bool MayBlock,
867 size_t LgSegmentSize = 8,
868 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
869 template <typename> class Atom = std::atomic>
870using USPSCQueue =
871 UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
872
873template <
874 typename T,
875 bool MayBlock,
876 size_t LgSegmentSize = 8,
877 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
878 template <typename> class Atom = std::atomic>
879using UMPSCQueue =
880 UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
881
882template <
883 typename T,
884 bool MayBlock,
885 size_t LgSegmentSize = 8,
886 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
887 template <typename> class Atom = std::atomic>
888using USPMCQueue =
889 UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
890
891template <
892 typename T,
893 bool MayBlock,
894 size_t LgSegmentSize = 8,
895 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
896 template <typename> class Atom = std::atomic>
897using UMPMCQueue =
898 UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
899
900} // namespace folly
901