1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <algorithm>
20#include <atomic>
21#include <cassert>
22#include <cstring>
23#include <limits>
24#include <type_traits>
25
26#include <boost/noncopyable.hpp>
27
28#include <folly/Traits.h>
29#include <folly/concurrency/CacheLocality.h>
30#include <folly/detail/TurnSequencer.h>
31#include <folly/portability/Unistd.h>
32
33namespace folly {
34
35namespace detail {
36
37template <typename T, template <typename> class Atom>
38struct SingleElementQueue;
39
40template <typename T>
41class MPMCPipelineStageImpl;
42
43/// MPMCQueue base CRTP template
44template <typename>
45class MPMCQueueBase;
46
47} // namespace detail
48
49/// MPMCQueue<T> is a high-performance bounded concurrent queue that
50/// supports multiple producers, multiple consumers, and optional blocking.
51/// The queue has a fixed capacity, for which all memory will be allocated
52/// up front. The bulk of the work of enqueuing and dequeuing can be
53/// performed in parallel.
54///
55/// MPMCQueue is linearizable. That means that if a call to write(A)
56/// returns before a call to write(B) begins, then A will definitely end up
57/// in the queue before B, and if a call to read(X) returns before a call
58/// to read(Y) is started, that X will be something from earlier in the
59/// queue than Y. This also means that if a read call returns a value, you
60/// can be sure that all previous elements of the queue have been assigned
61/// a reader (that reader might not yet have returned, but it exists).
62///
63/// The underlying implementation uses a ticket dispenser for the head and
64/// the tail, spreading accesses across N single-element queues to produce
65/// a queue with capacity N. The ticket dispensers use atomic increment,
66/// which is more robust to contention than a CAS loop. Each of the
67/// single-element queues uses its own CAS to serialize access, with an
68/// adaptive spin cutoff. When spinning fails on a single-element queue
69/// it uses futex()'s _BITSET operations to reduce unnecessary wakeups
70/// even if multiple waiters are present on an individual queue (such as
71/// when the MPMCQueue's capacity is smaller than the number of enqueuers
72/// or dequeuers).
73///
74/// In benchmarks (contained in tao/queues/ConcurrentQueueTests)
75/// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better
76/// than any of the alternatives present in fbcode, for both small (~10)
77/// and large capacities. In these benchmarks it is also faster than
78/// tbb::concurrent_bounded_queue for all configurations. When there are
79/// many more threads than cores, MPMCQueue is _much_ faster than the tbb
80/// queue because it uses futex() to block and unblock waiting threads,
81/// rather than spinning with sched_yield.
82///
83/// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based
84/// queues separate the assignment of queue positions from the actual
85/// construction of the in-queue elements, which means that the T
86/// constructor used during enqueue must not throw an exception. This is
87/// enforced at compile time using type traits, which requires that T be
88/// adorned with accurate noexcept information. If your type does not
89/// use noexcept, you will have to wrap it in something that provides
90/// the guarantee. We provide an alternate safe implementation for types
91/// that don't use noexcept but that are marked folly::IsRelocatable
92/// and std::is_nothrow_constructible, which is common for folly types.
93/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE
94/// then your type can be put in MPMCQueue.
95///
96/// If you have a pool of N queue consumers that you want to shut down
97/// after the queue has drained, one way is to enqueue N sentinel values
98/// to the queue. If the producer doesn't know how many consumers there
99/// are you can enqueue one sentinel and then have each consumer requeue
100/// two sentinels after it receives it (by requeuing 2 the shutdown can
101/// complete in O(log P) time instead of O(P)).
102template <
103 typename T,
104 template <typename> class Atom = std::atomic,
105 bool Dynamic = false>
106class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>> {
107 friend class detail::MPMCPipelineStageImpl<T>;
108 using Slot = detail::SingleElementQueue<T, Atom>;
109
110 public:
111 explicit MPMCQueue(size_t queueCapacity)
112 : detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>>(queueCapacity) {
113 this->stride_ = this->computeStride(queueCapacity);
114 this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
115 }
116
117 MPMCQueue() noexcept {}
118};
119
120/// The dynamic version of MPMCQueue allows dynamic expansion of queue
121/// capacity, such that a queue may start with a smaller capacity than
122/// specified and expand only if needed. Users may optionally specify
123/// the initial capacity and the expansion multiplier.
124///
125/// The design uses a seqlock to enforce mutual exclusion among
126/// expansion attempts. Regular operations read up-to-date queue
127/// information (slots array, capacity, stride) inside read-only
128/// seqlock sections, which are unimpeded when no expansion is in
129/// progress.
130///
131/// An expansion computes a new capacity, allocates a new slots array,
132/// and updates stride. No information needs to be copied from the
133/// current slots array to the new one. When this happens, new slots
134/// will not have sequence numbers that match ticket numbers. The
135/// expansion needs to compute a ticket offset such that operations
136/// that use new arrays can adjust the calculations of slot indexes
137/// and sequence numbers that take into account that the new slots
138/// start with sequence numbers of zero. The current ticket offset is
139/// packed with the seqlock in an atomic 64-bit integer. The initial
140/// offset is zero.
141///
142/// Lagging write and read operations with tickets lower than the
143/// ticket offset of the current slots array (i.e., the minimum ticket
144/// number that can be served by the current array) must use earlier
145/// closed arrays instead of the current one. Information about closed
146/// slots arrays (array address, capacity, stride, and offset) is
147/// maintained in a logarithmic-sized structure. Each entry in that
148/// structure never needs to be changed once set. The number of closed
149/// arrays is half the value of the seqlock (when unlocked).
150///
151/// The acquisition of the seqlock to perform an expansion does not
152/// prevent the issuing of new push and pop tickets concurrently. The
153/// expansion must set the new ticket offset to a value that couldn't
154/// have been issued to an operation that has already gone through a
155/// seqlock read-only section (and hence obtained information for
156/// older closed arrays).
157///
158/// Note that the total queue capacity can temporarily exceed the
159/// specified capacity when there are lagging consumers that haven't
160/// yet consumed all the elements in closed arrays. Users should not
161/// rely on the capacity of dynamic queues for synchronization, e.g.,
162/// they should not expect that a thread will definitely block on a
163/// call to blockingWrite() when the queue size is known to be equal
164/// to its capacity.
165///
166/// Note that some writeIfNotFull() and tryWriteUntil() operations may
167/// fail even if the size of the queue is less than its maximum
168/// capacity and despite the success of expansion, if the operation
169/// happens to acquire a ticket that belongs to a closed array. This
170/// is a transient condition. Typically, one or two ticket values may
171/// be subject to such condition per expansion.
172///
173/// The dynamic version is a partial specialization of MPMCQueue with
174/// Dynamic == true
175template <typename T, template <typename> class Atom>
176class MPMCQueue<T, Atom, true>
177 : public detail::MPMCQueueBase<MPMCQueue<T, Atom, true>> {
178 friend class detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>;
179 using Slot = detail::SingleElementQueue<T, Atom>;
180
181 struct ClosedArray {
182 uint64_t offset_{0};
183 Slot* slots_{nullptr};
184 size_t capacity_{0};
185 int stride_{0};
186 };
187
188 public:
189 explicit MPMCQueue(size_t queueCapacity)
190 : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
191 size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
192 initQueue(cap, kDefaultExpansionMultiplier);
193 }
194
195 explicit MPMCQueue(
196 size_t queueCapacity,
197 size_t minCapacity,
198 size_t expansionMultiplier)
199 : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
200 minCapacity = std::max<size_t>(1, minCapacity);
201 size_t cap = std::min<size_t>(minCapacity, queueCapacity);
202 expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
203 initQueue(cap, expansionMultiplier);
204 }
205
206 MPMCQueue() noexcept {
207 dmult_ = 0;
208 closed_ = nullptr;
209 }
210
211 MPMCQueue(MPMCQueue<T, Atom, true>&& rhs) noexcept {
212 this->capacity_ = rhs.capacity_;
213 new (&this->dslots_)
214 Atom<Slot*>(rhs.dslots_.load(std::memory_order_relaxed));
215 new (&this->dstride_)
216 Atom<int>(rhs.dstride_.load(std::memory_order_relaxed));
217 this->dstate_.store(
218 rhs.dstate_.load(std::memory_order_relaxed), std::memory_order_relaxed);
219 this->dcapacity_.store(
220 rhs.dcapacity_.load(std::memory_order_relaxed),
221 std::memory_order_relaxed);
222 this->pushTicket_.store(
223 rhs.pushTicket_.load(std::memory_order_relaxed),
224 std::memory_order_relaxed);
225 this->popTicket_.store(
226 rhs.popTicket_.load(std::memory_order_relaxed),
227 std::memory_order_relaxed);
228 this->pushSpinCutoff_.store(
229 rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
230 std::memory_order_relaxed);
231 this->popSpinCutoff_.store(
232 rhs.popSpinCutoff_.load(std::memory_order_relaxed),
233 std::memory_order_relaxed);
234 dmult_ = rhs.dmult_;
235 closed_ = rhs.closed_;
236
237 rhs.capacity_ = 0;
238 rhs.dslots_.store(nullptr, std::memory_order_relaxed);
239 rhs.dstride_.store(0, std::memory_order_relaxed);
240 rhs.dstate_.store(0, std::memory_order_relaxed);
241 rhs.dcapacity_.store(0, std::memory_order_relaxed);
242 rhs.pushTicket_.store(0, std::memory_order_relaxed);
243 rhs.popTicket_.store(0, std::memory_order_relaxed);
244 rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
245 rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
246 rhs.dmult_ = 0;
247 rhs.closed_ = nullptr;
248 }
249
250 MPMCQueue<T, Atom, true> const& operator=(MPMCQueue<T, Atom, true>&& rhs) {
251 if (this != &rhs) {
252 this->~MPMCQueue();
253 new (this) MPMCQueue(std::move(rhs));
254 }
255 return *this;
256 }
257
258 ~MPMCQueue() {
259 if (closed_ != nullptr) {
260 for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
261 delete[] closed_[i].slots_;
262 }
263 delete[] closed_;
264 }
265 using AtomInt = Atom<int>;
266 this->dstride_.~AtomInt();
267 using AtomSlot = Atom<Slot*>;
268 // Sort of a hack to get ~MPMCQueueBase to free dslots_
269 auto slots = this->dslots_.load();
270 this->dslots_.~AtomSlot();
271 this->slots_ = slots;
272 }
273
274 size_t allocatedCapacity() const noexcept {
275 return this->dcapacity_.load(std::memory_order_relaxed);
276 }
277
278 template <typename... Args>
279 void blockingWrite(Args&&... args) noexcept {
280 uint64_t ticket = this->pushTicket_++;
281 Slot* slots;
282 size_t cap;
283 int stride;
284 uint64_t state;
285 uint64_t offset;
286 do {
287 if (!trySeqlockReadSection(state, slots, cap, stride)) {
288 asm_volatile_pause();
289 continue;
290 }
291 if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
292 // There was an expansion after this ticket was issued.
293 break;
294 }
295 if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
296 this->turn(ticket - offset, cap))) {
297 // A slot is ready. No need to expand.
298 break;
299 } else if (
300 this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
301 // May block, but a pop is in progress. No need to expand.
302 // Get seqlock read section info again in case an expansion
303 // occurred with an equal or higher ticket.
304 continue;
305 } else {
306 // May block. See if we can expand.
307 if (tryExpand(state, cap)) {
308 // This or another thread started an expansion. Get updated info.
309 continue;
310 } else {
311 // Can't expand.
312 break;
313 }
314 }
315 } while (true);
316 this->enqueueWithTicketBase(
317 ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
318 }
319
320 void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
321 ticket = this->popTicket_++;
322 Slot* slots;
323 size_t cap;
324 int stride;
325 uint64_t state;
326 uint64_t offset;
327 while (!trySeqlockReadSection(state, slots, cap, stride)) {
328 asm_volatile_pause();
329 }
330 // If there was an expansion after the corresponding push ticket
331 // was issued, adjust accordingly
332 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
333 this->dequeueWithTicketBase(ticket - offset, slots, cap, stride, elem);
334 }
335
336 private:
337 enum {
338 kSeqlockBits = 6,
339 kDefaultMinDynamicCapacity = 10,
340 kDefaultExpansionMultiplier = 10,
341 };
342
343 size_t dmult_;
344
345 // Info about closed slots arrays for use by lagging operations
346 ClosedArray* closed_;
347
348 void initQueue(const size_t cap, const size_t mult) {
349 new (&this->dstride_) Atom<int>(this->computeStride(cap));
350 Slot* slots = new Slot[cap + 2 * this->kSlotPadding];
351 new (&this->dslots_) Atom<Slot*>(slots);
352 this->dstate_.store(0);
353 this->dcapacity_.store(cap);
354 dmult_ = mult;
355 size_t maxClosed = 0;
356 for (size_t expanded = cap; expanded < this->capacity_; expanded *= mult) {
357 ++maxClosed;
358 }
359 closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
360 }
361
362 bool tryObtainReadyPushTicket(
363 uint64_t& ticket,
364 Slot*& slots,
365 size_t& cap,
366 int& stride) noexcept {
367 uint64_t state;
368 do {
369 ticket = this->pushTicket_.load(std::memory_order_acquire); // A
370 if (!trySeqlockReadSection(state, slots, cap, stride)) {
371 asm_volatile_pause();
372 continue;
373 }
374
375 // If there was an expansion with offset greater than this ticket,
376 // adjust accordingly
377 uint64_t offset;
378 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
379
380 if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
381 this->turn(ticket - offset, cap))) {
382 // A slot is ready.
383 if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
384 // Adjust ticket
385 ticket -= offset;
386 return true;
387 } else {
388 continue;
389 }
390 } else {
391 if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
392 // Try again. Ticket changed.
393 continue;
394 }
395 // Likely to block.
396 // Try to expand unless the ticket is for a closed array
397 if (offset == getOffset(state)) {
398 if (tryExpand(state, cap)) {
399 // This or another thread started an expansion. Get up-to-date info.
400 continue;
401 }
402 }
403 return false;
404 }
405 } while (true);
406 }
407
408 bool tryObtainPromisedPushTicket(
409 uint64_t& ticket,
410 Slot*& slots,
411 size_t& cap,
412 int& stride) noexcept {
413 uint64_t state;
414 do {
415 ticket = this->pushTicket_.load(std::memory_order_acquire);
416 auto numPops = this->popTicket_.load(std::memory_order_acquire);
417 if (!trySeqlockReadSection(state, slots, cap, stride)) {
418 asm_volatile_pause();
419 continue;
420 }
421
422 const auto curCap = cap;
423 // If there was an expansion with offset greater than this ticket,
424 // adjust accordingly
425 uint64_t offset;
426 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
427
428 int64_t n = ticket - numPops;
429
430 if (n >= static_cast<ssize_t>(cap)) {
431 if ((cap == curCap) && tryExpand(state, cap)) {
432 // This or another thread started an expansion. Start over.
433 continue;
434 }
435 // Can't expand.
436 ticket -= offset;
437 return false;
438 }
439
440 if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
441 // Adjust ticket
442 ticket -= offset;
443 return true;
444 }
445 } while (true);
446 }
447
448 bool tryObtainReadyPopTicket(
449 uint64_t& ticket,
450 Slot*& slots,
451 size_t& cap,
452 int& stride) noexcept {
453 uint64_t state;
454 do {
455 ticket = this->popTicket_.load(std::memory_order_relaxed);
456 if (!trySeqlockReadSection(state, slots, cap, stride)) {
457 asm_volatile_pause();
458 continue;
459 }
460
461 // If there was an expansion after the corresponding push ticket
462 // was issued, adjust accordingly
463 uint64_t offset;
464 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
465
466 if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
467 this->turn(ticket - offset, cap))) {
468 if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
469 // Adjust ticket
470 ticket -= offset;
471 return true;
472 }
473 } else {
474 return false;
475 }
476 } while (true);
477 }
478
479 bool tryObtainPromisedPopTicket(
480 uint64_t& ticket,
481 Slot*& slots,
482 size_t& cap,
483 int& stride) noexcept {
484 uint64_t state;
485 do {
486 ticket = this->popTicket_.load(std::memory_order_acquire);
487 auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
488 if (!trySeqlockReadSection(state, slots, cap, stride)) {
489 asm_volatile_pause();
490 continue;
491 }
492
493 uint64_t offset;
494 // If there was an expansion after the corresponding push
495 // ticket was issued, adjust accordingly
496 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
497
498 if (ticket >= numPushes) {
499 ticket -= offset;
500 return false;
501 }
502 if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
503 ticket -= offset;
504 return true;
505 }
506 } while (true);
507 }
508
509 /// Enqueues an element with a specific ticket number
510 template <typename... Args>
511 void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
512 Slot* slots;
513 size_t cap;
514 int stride;
515 uint64_t state;
516 uint64_t offset;
517
518 while (!trySeqlockReadSection(state, slots, cap, stride)) {
519 }
520
521 // If there was an expansion after this ticket was issued, adjust
522 // accordingly
523 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
524
525 this->enqueueWithTicketBase(
526 ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
527 }
528
529 uint64_t getOffset(const uint64_t state) const noexcept {
530 return state >> kSeqlockBits;
531 }
532
533 int getNumClosed(const uint64_t state) const noexcept {
534 return (state & ((1 << kSeqlockBits) - 1)) >> 1;
535 }
536
537 /// Try to expand the queue. Returns true if this expansion was
538 /// successful or a concurent expansion is in progress. Returns
539 /// false if the queue has reached its maximum capacity or
540 /// allocation has failed.
541 bool tryExpand(const uint64_t state, const size_t cap) noexcept {
542 if (cap == this->capacity_) {
543 return false;
544 }
545 // Acquire seqlock
546 uint64_t oldval = state;
547 assert((state & 1) == 0);
548 if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
549 assert(cap == this->dcapacity_.load());
550 uint64_t ticket =
551 1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
552 size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
553 Slot* newSlots =
554 new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
555 if (newSlots == nullptr) {
556 // Expansion failed. Restore the seqlock
557 this->dstate_.store(state);
558 return false;
559 }
560 // Successful expansion
561 // calculate the current ticket offset
562 uint64_t offset = getOffset(state);
563 // calculate index in closed array
564 int index = getNumClosed(state);
565 assert((index << 1) < (1 << kSeqlockBits));
566 // fill the info for the closed slots array
567 closed_[index].offset_ = offset;
568 closed_[index].slots_ = this->dslots_.load();
569 closed_[index].capacity_ = cap;
570 closed_[index].stride_ = this->dstride_.load();
571 // update the new slots array info
572 this->dslots_.store(newSlots);
573 this->dcapacity_.store(newCapacity);
574 this->dstride_.store(this->computeStride(newCapacity));
575 // Release the seqlock and record the new ticket offset
576 this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
577 return true;
578 } else { // failed to acquire seqlock
579 // Someone acaquired the seqlock. Go back to the caller and get
580 // up-to-date info.
581 return true;
582 }
583 }
584
585 /// Seqlock read-only section
586 bool trySeqlockReadSection(
587 uint64_t& state,
588 Slot*& slots,
589 size_t& cap,
590 int& stride) noexcept {
591 state = this->dstate_.load(std::memory_order_acquire);
592 if (state & 1) {
593 // Locked.
594 return false;
595 }
596 // Start read-only section.
597 slots = this->dslots_.load(std::memory_order_relaxed);
598 cap = this->dcapacity_.load(std::memory_order_relaxed);
599 stride = this->dstride_.load(std::memory_order_relaxed);
600 // End of read-only section. Validate seqlock.
601 std::atomic_thread_fence(std::memory_order_acquire);
602 return (state == this->dstate_.load(std::memory_order_relaxed));
603 }
604
605 /// If there was an expansion after ticket was issued, update local variables
606 /// of the lagging operation using the most recent closed array with
607 /// offset <= ticket and return true. Otherwise, return false;
608 bool maybeUpdateFromClosed(
609 const uint64_t state,
610 const uint64_t ticket,
611 uint64_t& offset,
612 Slot*& slots,
613 size_t& cap,
614 int& stride) noexcept {
615 offset = getOffset(state);
616 if (ticket >= offset) {
617 return false;
618 }
619 for (int i = getNumClosed(state) - 1; i >= 0; --i) {
620 offset = closed_[i].offset_;
621 if (offset <= ticket) {
622 slots = closed_[i].slots_;
623 cap = closed_[i].capacity_;
624 stride = closed_[i].stride_;
625 return true;
626 }
627 }
628 // A closed array with offset <= ticket should have been found
629 assert(false);
630 return false;
631 }
632};
633
634namespace detail {
635
636/// CRTP specialization of MPMCQueueBase
637template <
638 template <typename T, template <typename> class Atom, bool Dynamic>
639 class Derived,
640 typename T,
641 template <typename> class Atom,
642 bool Dynamic>
643class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
644 // Note: Using CRTP static casts in several functions of this base
645 // template instead of making called functions virtual or duplicating
646 // the code of calling functions in the derived partially specialized
647 // template
648
649 static_assert(
650 std::is_nothrow_constructible<T, T&&>::value ||
651 folly::IsRelocatable<T>::value,
652 "T must be relocatable or have a noexcept move constructor");
653
654 public:
655 typedef T value_type;
656
657 using Slot = detail::SingleElementQueue<T, Atom>;
658
659 explicit MPMCQueueBase(size_t queueCapacity)
660 : capacity_(queueCapacity),
661 dstate_(0),
662 dcapacity_(0),
663 pushTicket_(0),
664 popTicket_(0),
665 pushSpinCutoff_(0),
666 popSpinCutoff_(0) {
667 if (queueCapacity == 0) {
668 throw std::invalid_argument(
669 "MPMCQueue with explicit capacity 0 is impossible"
670 // Stride computation in derived classes would sigfpe if capacity is 0
671 );
672 }
673
674 // ideally this would be a static assert, but g++ doesn't allow it
675 assert(
676 alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size);
677 assert(
678 static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
679 static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
680 static_cast<ptrdiff_t>(hardware_destructive_interference_size));
681 }
682
683 /// A default-constructed queue is useful because a usable (non-zero
684 /// capacity) queue can be moved onto it or swapped with it
685 MPMCQueueBase() noexcept
686 : capacity_(0),
687 slots_(nullptr),
688 stride_(0),
689 dstate_(0),
690 dcapacity_(0),
691 pushTicket_(0),
692 popTicket_(0),
693 pushSpinCutoff_(0),
694 popSpinCutoff_(0) {}
695
696 /// IMPORTANT: The move constructor is here to make it easier to perform
697 /// the initialization phase, it is not safe to use when there are any
698 /// concurrent accesses (this is not checked).
699 MPMCQueueBase(MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) noexcept
700 : capacity_(rhs.capacity_),
701 slots_(rhs.slots_),
702 stride_(rhs.stride_),
703 dstate_(rhs.dstate_.load(std::memory_order_relaxed)),
704 dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)),
705 pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)),
706 popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)),
707 pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)),
708 popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed)) {
709 // relaxed ops are okay for the previous reads, since rhs queue can't
710 // be in concurrent use
711
712 // zero out rhs
713 rhs.capacity_ = 0;
714 rhs.slots_ = nullptr;
715 rhs.stride_ = 0;
716 rhs.dstate_.store(0, std::memory_order_relaxed);
717 rhs.dcapacity_.store(0, std::memory_order_relaxed);
718 rhs.pushTicket_.store(0, std::memory_order_relaxed);
719 rhs.popTicket_.store(0, std::memory_order_relaxed);
720 rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
721 rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
722 }
723
724 /// IMPORTANT: The move operator is here to make it easier to perform
725 /// the initialization phase, it is not safe to use when there are any
726 /// concurrent accesses (this is not checked).
727 MPMCQueueBase<Derived<T, Atom, Dynamic>> const& operator=(
728 MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) {
729 if (this != &rhs) {
730 this->~MPMCQueueBase();
731 new (this) MPMCQueueBase(std::move(rhs));
732 }
733 return *this;
734 }
735
736 /// MPMCQueue can only be safely destroyed when there are no
737 /// pending enqueuers or dequeuers (this is not checked).
738 ~MPMCQueueBase() {
739 delete[] slots_;
740 }
741
742 /// Returns the number of writes (including threads that are blocked waiting
743 /// to write) minus the number of reads (including threads that are blocked
744 /// waiting to read). So effectively, it becomes:
745 /// elements in queue + pending(calls to write) - pending(calls to read).
746 /// If nothing is pending, then the method returns the actual number of
747 /// elements in the queue.
748 /// The returned value can be negative if there are no writers and the queue
749 /// is empty, but there is one reader that is blocked waiting to read (in
750 /// which case, the returned size will be -1).
751 ssize_t size() const noexcept {
752 // since both pushes and pops increase monotonically, we can get a
753 // consistent snapshot either by bracketing a read of popTicket_ with
754 // two reads of pushTicket_ that return the same value, or the other
755 // way around. We maximize our chances by alternately attempting
756 // both bracketings.
757 uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
758 uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
759 while (true) {
760 uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
761 if (pushes == nextPushes) {
762 // pushTicket_ didn't change from A (or the previous C) to C,
763 // so we can linearize at B (or D)
764 return ssize_t(pushes - pops);
765 }
766 pushes = nextPushes;
767 uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
768 if (pops == nextPops) {
769 // popTicket_ didn't chance from B (or the previous D), so we
770 // can linearize at C
771 return ssize_t(pushes - pops);
772 }
773 pops = nextPops;
774 }
775 }
776
777 /// Returns true if there are no items available for dequeue
778 bool isEmpty() const noexcept {
779 return size() <= 0;
780 }
781
782 /// Returns true if there is currently no empty space to enqueue
783 bool isFull() const noexcept {
784 // careful with signed -> unsigned promotion, since size can be negative
785 return size() >= static_cast<ssize_t>(capacity_);
786 }
787
788 /// Returns is a guess at size() for contexts that don't need a precise
789 /// value, such as stats. More specifically, it returns the number of writes
790 /// minus the number of reads, but after reading the number of writes, more
791 /// writers could have came before the number of reads was sampled,
792 /// and this method doesn't protect against such case.
793 /// The returned value can be negative.
794 ssize_t sizeGuess() const noexcept {
795 return writeCount() - readCount();
796 }
797
798 /// Doesn't change
799 size_t capacity() const noexcept {
800 return capacity_;
801 }
802
803 /// Doesn't change for non-dynamic
804 size_t allocatedCapacity() const noexcept {
805 return capacity_;
806 }
807
808 /// Returns the total number of calls to blockingWrite or successful
809 /// calls to write, including those blockingWrite calls that are
810 /// currently blocking
811 uint64_t writeCount() const noexcept {
812 return pushTicket_.load(std::memory_order_acquire);
813 }
814
815 /// Returns the total number of calls to blockingRead or successful
816 /// calls to read, including those blockingRead calls that are currently
817 /// blocking
818 uint64_t readCount() const noexcept {
819 return popTicket_.load(std::memory_order_acquire);
820 }
821
822 /// Enqueues a T constructed from args, blocking until space is
823 /// available. Note that this method signature allows enqueue via
824 /// move, if args is a T rvalue, via copy, if args is a T lvalue, or
825 /// via emplacement if args is an initializer list that can be passed
826 /// to a T constructor.
827 template <typename... Args>
828 void blockingWrite(Args&&... args) noexcept {
829 enqueueWithTicketBase(
830 pushTicket_++, slots_, capacity_, stride_, std::forward<Args>(args)...);
831 }
832
833 /// If an item can be enqueued with no blocking, does so and returns
834 /// true, otherwise returns false. This method is similar to
835 /// writeIfNotFull, but if you don't have a specific need for that
836 /// method you should use this one.
837 ///
838 /// One of the common usages of this method is to enqueue via the
839 /// move constructor, something like q.write(std::move(x)). If write
840 /// returns false because the queue is full then x has not actually been
841 /// consumed, which looks strange. To understand why it is actually okay
842 /// to use x afterward, remember that std::move is just a typecast that
843 /// provides an rvalue reference that enables use of a move constructor
844 /// or operator. std::move doesn't actually move anything. It could
845 /// more accurately be called std::rvalue_cast or std::move_permission.
846 template <typename... Args>
847 bool write(Args&&... args) noexcept {
848 uint64_t ticket;
849 Slot* slots;
850 size_t cap;
851 int stride;
852 if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPushTicket(
853 ticket, slots, cap, stride)) {
854 // we have pre-validated that the ticket won't block
855 enqueueWithTicketBase(
856 ticket, slots, cap, stride, std::forward<Args>(args)...);
857 return true;
858 } else {
859 return false;
860 }
861 }
862
863 template <class Clock, typename... Args>
864 bool tryWriteUntil(
865 const std::chrono::time_point<Clock>& when,
866 Args&&... args) noexcept {
867 uint64_t ticket;
868 Slot* slots;
869 size_t cap;
870 int stride;
871 if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
872 // we have pre-validated that the ticket won't block, or rather that
873 // it won't block longer than it takes another thread to dequeue an
874 // element from the slot it identifies.
875 enqueueWithTicketBase(
876 ticket, slots, cap, stride, std::forward<Args>(args)...);
877 return true;
878 } else {
879 return false;
880 }
881 }
882
883 /// If the queue is not full, enqueues and returns true, otherwise
884 /// returns false. Unlike write this method can be blocked by another
885 /// thread, specifically a read that has linearized (been assigned
886 /// a ticket) but not yet completed. If you don't really need this
887 /// function you should probably use write.
888 ///
889 /// MPMCQueue isn't lock-free, so just because a read operation has
890 /// linearized (and isFull is false) doesn't mean that space has been
891 /// made available for another write. In this situation write will
892 /// return false, but writeIfNotFull will wait for the dequeue to finish.
893 /// This method is required if you are composing queues and managing
894 /// your own wakeup, because it guarantees that after every successful
895 /// write a readIfNotEmpty will succeed.
896 template <typename... Args>
897 bool writeIfNotFull(Args&&... args) noexcept {
898 uint64_t ticket;
899 Slot* slots;
900 size_t cap;
901 int stride;
902 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
903 ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
904 // some other thread is already dequeuing the slot into which we
905 // are going to enqueue, but we might have to wait for them to finish
906 enqueueWithTicketBase(
907 ticket, slots, cap, stride, std::forward<Args>(args)...);
908 return true;
909 } else {
910 return false;
911 }
912 }
913
914 /// Moves a dequeued element onto elem, blocking until an element
915 /// is available
916 void blockingRead(T& elem) noexcept {
917 uint64_t ticket;
918 static_cast<Derived<T, Atom, Dynamic>*>(this)->blockingReadWithTicket(
919 ticket, elem);
920 }
921
922 /// Same as blockingRead() but also records the ticket nunmer
923 void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
924 assert(capacity_ != 0);
925 ticket = popTicket_++;
926 dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
927 }
928
929 /// If an item can be dequeued with no blocking, does so and returns
930 /// true, otherwise returns false.
931 bool read(T& elem) noexcept {
932 uint64_t ticket;
933 return readAndGetTicket(ticket, elem);
934 }
935
936 /// Same as read() but also records the ticket nunmer
937 bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
938 Slot* slots;
939 size_t cap;
940 int stride;
941 if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPopTicket(
942 ticket, slots, cap, stride)) {
943 // the ticket has been pre-validated to not block
944 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
945 return true;
946 } else {
947 return false;
948 }
949 }
950
951 template <class Clock, typename... Args>
952 bool tryReadUntil(
953 const std::chrono::time_point<Clock>& when,
954 T& elem) noexcept {
955 uint64_t ticket;
956 Slot* slots;
957 size_t cap;
958 int stride;
959 if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) {
960 // we have pre-validated that the ticket won't block, or rather that
961 // it won't block longer than it takes another thread to enqueue an
962 // element on the slot it identifies.
963 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
964 return true;
965 } else {
966 return false;
967 }
968 }
969
970 /// If the queue is not empty, dequeues and returns true, otherwise
971 /// returns false. If the matching write is still in progress then this
972 /// method may block waiting for it. If you don't rely on being able
973 /// to dequeue (such as by counting completed write) then you should
974 /// prefer read.
975 bool readIfNotEmpty(T& elem) noexcept {
976 uint64_t ticket;
977 Slot* slots;
978 size_t cap;
979 int stride;
980 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
981 ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
982 // the matching enqueue already has a ticket, but might not be done
983 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
984 return true;
985 } else {
986 return false;
987 }
988 }
989
990 protected:
991 enum {
992 /// Once every kAdaptationFreq we will spin longer, to try to estimate
993 /// the proper spin backoff
994 kAdaptationFreq = 128,
995
996 /// To avoid false sharing in slots_ with neighboring memory
997 /// allocations, we pad it with this many SingleElementQueue-s at
998 /// each end
999 kSlotPadding =
1000 (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1
1001 };
1002
1003 /// The maximum number of items in the queue at once
1004 alignas(hardware_destructive_interference_size) size_t capacity_;
1005
1006 /// Anonymous union for use when Dynamic = false and true, respectively
1007 union {
1008 /// An array of capacity_ SingleElementQueue-s, each of which holds
1009 /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't
1010 /// touch the slots at either end, to avoid false sharing
1011 Slot* slots_;
1012 /// Current dynamic slots array of dcapacity_ SingleElementQueue-s
1013 Atom<Slot*> dslots_;
1014 };
1015
1016 /// Anonymous union for use when Dynamic = false and true, respectively
1017 union {
1018 /// The number of slots_ indices that we advance for each ticket, to
1019 /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_]
1020 /// aren't on the same cache line
1021 int stride_;
1022 /// Current stride
1023 Atom<int> dstride_;
1024 };
1025
1026 /// The following two memebers are used by dynamic MPMCQueue.
1027 /// Ideally they should be in MPMCQueue<T,Atom,true>, but we get
1028 /// better cache locality if they are in the same cache line as
1029 /// dslots_ and dstride_.
1030 ///
1031 /// Dynamic state. A packed seqlock and ticket offset
1032 Atom<uint64_t> dstate_;
1033 /// Dynamic capacity
1034 Atom<size_t> dcapacity_;
1035
1036 /// Enqueuers get tickets from here
1037 alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_;
1038
1039 /// Dequeuers get tickets from here
1040 alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_;
1041
1042 /// This is how many times we will spin before using FUTEX_WAIT when
1043 /// the queue is full on enqueue, adaptively computed by occasionally
1044 /// spinning for longer and smoothing with an exponential moving average
1045 alignas(
1046 hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_;
1047
1048 /// The adaptive spin cutoff when the queue is empty on dequeue
1049 alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_;
1050
1051 /// Alignment doesn't prevent false sharing at the end of the struct,
1052 /// so fill out the last cache line
1053 char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)];
1054
1055 /// We assign tickets in increasing order, but we don't want to
1056 /// access neighboring elements of slots_ because that will lead to
1057 /// false sharing (multiple cores accessing the same cache line even
1058 /// though they aren't accessing the same bytes in that cache line).
1059 /// To avoid this we advance by stride slots per ticket.
1060 ///
1061 /// We need gcd(capacity, stride) to be 1 so that we will use all
1062 /// of the slots. We ensure this by only considering prime strides,
1063 /// which either have no common divisors with capacity or else have
1064 /// a zero remainder after dividing by capacity. That is sufficient
1065 /// to guarantee correctness, but we also want to actually spread the
1066 /// accesses away from each other to avoid false sharing (consider a
1067 /// stride of 7 with a capacity of 8). To that end we try a few taking
1068 /// care to observe that advancing by -1 is as bad as advancing by 1
1069 /// when in comes to false sharing.
1070 ///
1071 /// The simple way to avoid false sharing would be to pad each
1072 /// SingleElementQueue, but since we have capacity_ of them that could
1073 /// waste a lot of space.
1074 static int computeStride(size_t capacity) noexcept {
1075 static const int smallPrimes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23};
1076
1077 int bestStride = 1;
1078 size_t bestSep = 1;
1079 for (int stride : smallPrimes) {
1080 if ((stride % capacity) == 0 || (capacity % stride) == 0) {
1081 continue;
1082 }
1083 size_t sep = stride % capacity;
1084 sep = std::min(sep, capacity - sep);
1085 if (sep > bestSep) {
1086 bestStride = stride;
1087 bestSep = sep;
1088 }
1089 }
1090 return bestStride;
1091 }
1092
1093 /// Returns the index into slots_ that should be used when enqueuing or
1094 /// dequeuing with the specified ticket
1095 size_t idx(uint64_t ticket, size_t cap, int stride) noexcept {
1096 return ((ticket * stride) % cap) + kSlotPadding;
1097 }
1098
1099 /// Maps an enqueue or dequeue ticket to the turn should be used at the
1100 /// corresponding SingleElementQueue
1101 uint32_t turn(uint64_t ticket, size_t cap) noexcept {
1102 assert(cap != 0);
1103 return uint32_t(ticket / cap);
1104 }
1105
1106 /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
1107 /// won't block. Returns true on immediate success, false on immediate
1108 /// failure.
1109 bool tryObtainReadyPushTicket(
1110 uint64_t& ticket,
1111 Slot*& slots,
1112 size_t& cap,
1113 int& stride) noexcept {
1114 ticket = pushTicket_.load(std::memory_order_acquire); // A
1115 slots = slots_;
1116 cap = capacity_;
1117 stride = stride_;
1118 while (true) {
1119 if (!slots[idx(ticket, cap, stride)].mayEnqueue(turn(ticket, cap))) {
1120 // if we call enqueue(ticket, ...) on the SingleElementQueue
1121 // right now it would block, but this might no longer be the next
1122 // ticket. We can increase the chance of tryEnqueue success under
1123 // contention (without blocking) by rechecking the ticket dispenser
1124 auto prev = ticket;
1125 ticket = pushTicket_.load(std::memory_order_acquire); // B
1126 if (prev == ticket) {
1127 // mayEnqueue was bracketed by two reads (A or prev B or prev
1128 // failing CAS to B), so we are definitely unable to enqueue
1129 return false;
1130 }
1131 } else {
1132 // we will bracket the mayEnqueue check with a read (A or prev B
1133 // or prev failing CAS) and the following CAS. If the CAS fails
1134 // it will effect a load of pushTicket_
1135 if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1136 return true;
1137 }
1138 }
1139 }
1140 }
1141
1142 /// Tries until when to obtain a push ticket for which
1143 /// SingleElementQueue::enqueue won't block. Returns true on success, false
1144 /// on failure.
1145 /// ticket is filled on success AND failure.
1146 template <class Clock>
1147 bool tryObtainPromisedPushTicketUntil(
1148 uint64_t& ticket,
1149 Slot*& slots,
1150 size_t& cap,
1151 int& stride,
1152 const std::chrono::time_point<Clock>& when) noexcept {
1153 bool deadlineReached = false;
1154 while (!deadlineReached) {
1155 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1156 ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
1157 return true;
1158 }
1159 // ticket is a blocking ticket until the preceding ticket has been
1160 // processed: wait until this ticket's turn arrives. We have not reserved
1161 // this ticket so we will have to re-attempt to get a non-blocking ticket
1162 // if we wake up before we time-out.
1163 deadlineReached =
1164 !slots[idx(ticket, cap, stride)].tryWaitForEnqueueTurnUntil(
1165 turn(ticket, cap),
1166 pushSpinCutoff_,
1167 (ticket % kAdaptationFreq) == 0,
1168 when);
1169 }
1170 return false;
1171 }
1172
1173 /// Tries to obtain a push ticket which can be satisfied if all
1174 /// in-progress pops complete. This function does not block, but
1175 /// blocking may be required when using the returned ticket if some
1176 /// other thread's pop is still in progress (ticket has been granted but
1177 /// pop has not yet completed).
1178 bool tryObtainPromisedPushTicket(
1179 uint64_t& ticket,
1180 Slot*& slots,
1181 size_t& cap,
1182 int& stride) noexcept {
1183 auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
1184 slots = slots_;
1185 cap = capacity_;
1186 stride = stride_;
1187 while (true) {
1188 ticket = numPushes;
1189 const auto numPops = popTicket_.load(std::memory_order_acquire); // B
1190 // n will be negative if pops are pending
1191 const int64_t n = int64_t(numPushes - numPops);
1192 if (n >= static_cast<ssize_t>(capacity_)) {
1193 // Full, linearize at B. We don't need to recheck the read we
1194 // performed at A, because if numPushes was stale at B then the
1195 // real numPushes value is even worse
1196 return false;
1197 }
1198 if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
1199 return true;
1200 }
1201 }
1202 }
1203
1204 /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
1205 /// won't block. Returns true on immediate success, false on immediate
1206 /// failure.
1207 bool tryObtainReadyPopTicket(
1208 uint64_t& ticket,
1209 Slot*& slots,
1210 size_t& cap,
1211 int& stride) noexcept {
1212 ticket = popTicket_.load(std::memory_order_acquire);
1213 slots = slots_;
1214 cap = capacity_;
1215 stride = stride_;
1216 while (true) {
1217 if (!slots[idx(ticket, cap, stride)].mayDequeue(turn(ticket, cap))) {
1218 auto prev = ticket;
1219 ticket = popTicket_.load(std::memory_order_acquire);
1220 if (prev == ticket) {
1221 return false;
1222 }
1223 } else {
1224 if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1225 return true;
1226 }
1227 }
1228 }
1229 }
1230
1231 /// Tries until when to obtain a pop ticket for which
1232 /// SingleElementQueue::dequeue won't block. Returns true on success, false
1233 /// on failure.
1234 /// ticket is filled on success AND failure.
1235 template <class Clock>
1236 bool tryObtainPromisedPopTicketUntil(
1237 uint64_t& ticket,
1238 Slot*& slots,
1239 size_t& cap,
1240 int& stride,
1241 const std::chrono::time_point<Clock>& when) noexcept {
1242 bool deadlineReached = false;
1243 while (!deadlineReached) {
1244 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1245 ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
1246 return true;
1247 }
1248 // ticket is a blocking ticket until the preceding ticket has been
1249 // processed: wait until this ticket's turn arrives. We have not reserved
1250 // this ticket so we will have to re-attempt to get a non-blocking ticket
1251 // if we wake up before we time-out.
1252 deadlineReached =
1253 !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil(
1254 turn(ticket, cap),
1255 pushSpinCutoff_,
1256 (ticket % kAdaptationFreq) == 0,
1257 when);
1258 }
1259 return false;
1260 }
1261
1262 /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose
1263 /// corresponding push ticket has already been handed out, rather than
1264 /// returning one whose corresponding push ticket has already been
1265 /// completed. This means that there is a possibility that the caller
1266 /// will block when using the ticket, but it allows the user to rely on
1267 /// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket
1268 /// will return true. The "try" part of this is that we won't have
1269 /// to block waiting for someone to call enqueue, although we might
1270 /// have to block waiting for them to finish executing code inside the
1271 /// MPMCQueue itself.
1272 bool tryObtainPromisedPopTicket(
1273 uint64_t& ticket,
1274 Slot*& slots,
1275 size_t& cap,
1276 int& stride) noexcept {
1277 auto numPops = popTicket_.load(std::memory_order_acquire); // A
1278 slots = slots_;
1279 cap = capacity_;
1280 stride = stride_;
1281 while (true) {
1282 ticket = numPops;
1283 const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
1284 if (numPops >= numPushes) {
1285 // Empty, or empty with pending pops. Linearize at B. We don't
1286 // need to recheck the read we performed at A, because if numPops
1287 // is stale then the fresh value is larger and the >= is still true
1288 return false;
1289 }
1290 if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
1291 return true;
1292 }
1293 }
1294 }
1295
1296 // Given a ticket, constructs an enqueued item using args
1297 template <typename... Args>
1298 void enqueueWithTicketBase(
1299 uint64_t ticket,
1300 Slot* slots,
1301 size_t cap,
1302 int stride,
1303 Args&&... args) noexcept {
1304 slots[idx(ticket, cap, stride)].enqueue(
1305 turn(ticket, cap),
1306 pushSpinCutoff_,
1307 (ticket % kAdaptationFreq) == 0,
1308 std::forward<Args>(args)...);
1309 }
1310
1311 // To support tracking ticket numbers in MPMCPipelineStageImpl
1312 template <typename... Args>
1313 void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
1314 enqueueWithTicketBase(
1315 ticket, slots_, capacity_, stride_, std::forward<Args>(args)...);
1316 }
1317
1318 // Given a ticket, dequeues the corresponding element
1319 void dequeueWithTicketBase(
1320 uint64_t ticket,
1321 Slot* slots,
1322 size_t cap,
1323 int stride,
1324 T& elem) noexcept {
1325 assert(cap != 0);
1326 slots[idx(ticket, cap, stride)].dequeue(
1327 turn(ticket, cap),
1328 popSpinCutoff_,
1329 (ticket % kAdaptationFreq) == 0,
1330 elem);
1331 }
1332};
1333
1334/// SingleElementQueue implements a blocking queue that holds at most one
1335/// item, and that requires its users to assign incrementing identifiers
1336/// (turns) to each enqueue and dequeue operation. Note that the turns
1337/// used by SingleElementQueue are doubled inside the TurnSequencer
1338template <typename T, template <typename> class Atom>
1339struct SingleElementQueue {
1340 ~SingleElementQueue() noexcept {
1341 if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
1342 // we are pending a dequeue, so we have a constructed item
1343 destroyContents();
1344 }
1345 }
1346
1347 /// enqueue using in-place noexcept construction
1348 template <
1349 typename... Args,
1350 typename = typename std::enable_if<
1351 std::is_nothrow_constructible<T, Args...>::value>::type>
1352 void enqueue(
1353 const uint32_t turn,
1354 Atom<uint32_t>& spinCutoff,
1355 const bool updateSpinCutoff,
1356 Args&&... args) noexcept {
1357 sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1358 new (&contents_) T(std::forward<Args>(args)...);
1359 sequencer_.completeTurn(turn * 2);
1360 }
1361
1362 /// enqueue using move construction, either real (if
1363 /// is_nothrow_move_constructible) or simulated using relocation and
1364 /// default construction (if IsRelocatable and is_nothrow_constructible)
1365 template <
1366 typename = typename std::enable_if<
1367 (folly::IsRelocatable<T>::value &&
1368 std::is_nothrow_constructible<T>::value) ||
1369 std::is_nothrow_constructible<T, T&&>::value>::type>
1370 void enqueue(
1371 const uint32_t turn,
1372 Atom<uint32_t>& spinCutoff,
1373 const bool updateSpinCutoff,
1374 T&& goner) noexcept {
1375 enqueueImpl(
1376 turn,
1377 spinCutoff,
1378 updateSpinCutoff,
1379 std::move(goner),
1380 typename std::conditional<
1381 std::is_nothrow_constructible<T, T&&>::value,
1382 ImplByMove,
1383 ImplByRelocation>::type());
1384 }
1385
1386 /// Waits until either:
1387 /// 1: the dequeue turn preceding the given enqueue turn has arrived
1388 /// 2: the given deadline has arrived
1389 /// Case 1 returns true, case 2 returns false.
1390 template <class Clock>
1391 bool tryWaitForEnqueueTurnUntil(
1392 const uint32_t turn,
1393 Atom<uint32_t>& spinCutoff,
1394 const bool updateSpinCutoff,
1395 const std::chrono::time_point<Clock>& when) noexcept {
1396 return sequencer_.tryWaitForTurn(
1397 turn * 2, spinCutoff, updateSpinCutoff, &when) !=
1398 TurnSequencer<Atom>::TryWaitResult::TIMEDOUT;
1399 }
1400
1401 bool mayEnqueue(const uint32_t turn) const noexcept {
1402 return sequencer_.isTurn(turn * 2);
1403 }
1404
1405 void dequeue(
1406 uint32_t turn,
1407 Atom<uint32_t>& spinCutoff,
1408 const bool updateSpinCutoff,
1409 T& elem) noexcept {
1410 dequeueImpl(
1411 turn,
1412 spinCutoff,
1413 updateSpinCutoff,
1414 elem,
1415 typename std::conditional<
1416 folly::IsRelocatable<T>::value,
1417 ImplByRelocation,
1418 ImplByMove>::type());
1419 }
1420
1421 /// Waits until either:
1422 /// 1: the enqueue turn preceding the given dequeue turn has arrived
1423 /// 2: the given deadline has arrived
1424 /// Case 1 returns true, case 2 returns false.
1425 template <class Clock>
1426 bool tryWaitForDequeueTurnUntil(
1427 const uint32_t turn,
1428 Atom<uint32_t>& spinCutoff,
1429 const bool updateSpinCutoff,
1430 const std::chrono::time_point<Clock>& when) noexcept {
1431 return sequencer_.tryWaitForTurn(
1432 turn * 2 + 1, spinCutoff, updateSpinCutoff, &when) !=
1433 TurnSequencer<Atom>::TryWaitResult::TIMEDOUT;
1434 }
1435
1436 bool mayDequeue(const uint32_t turn) const noexcept {
1437 return sequencer_.isTurn(turn * 2 + 1);
1438 }
1439
1440 private:
1441 /// Storage for a T constructed with placement new
1442 aligned_storage_for_t<T> contents_;
1443
1444 /// Even turns are pushes, odd turns are pops
1445 TurnSequencer<Atom> sequencer_;
1446
1447 T* ptr() noexcept {
1448 return static_cast<T*>(static_cast<void*>(&contents_));
1449 }
1450
1451 void destroyContents() noexcept {
1452 try {
1453 ptr()->~T();
1454 } catch (...) {
1455 // g++ doesn't seem to have std::is_nothrow_destructible yet
1456 }
1457#ifndef NDEBUG
1458 memset(&contents_, 'Q', sizeof(T));
1459#endif
1460 }
1461
1462 /// Tag classes for dispatching to enqueue/dequeue implementation.
1463 struct ImplByRelocation {};
1464 struct ImplByMove {};
1465
1466 /// enqueue using nothrow move construction.
1467 void enqueueImpl(
1468 const uint32_t turn,
1469 Atom<uint32_t>& spinCutoff,
1470 const bool updateSpinCutoff,
1471 T&& goner,
1472 ImplByMove) noexcept {
1473 sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1474 new (&contents_) T(std::move(goner));
1475 sequencer_.completeTurn(turn * 2);
1476 }
1477
1478 /// enqueue by simulating nothrow move with relocation, followed by
1479 /// default construction to a noexcept relocation.
1480 void enqueueImpl(
1481 const uint32_t turn,
1482 Atom<uint32_t>& spinCutoff,
1483 const bool updateSpinCutoff,
1484 T&& goner,
1485 ImplByRelocation) noexcept {
1486 sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1487 memcpy(
1488 static_cast<void*>(&contents_),
1489 static_cast<void const*>(&goner),
1490 sizeof(T));
1491 sequencer_.completeTurn(turn * 2);
1492 new (&goner) T();
1493 }
1494
1495 /// dequeue by destructing followed by relocation. This version is preferred,
1496 /// because as much work as possible can be done before waiting.
1497 void dequeueImpl(
1498 uint32_t turn,
1499 Atom<uint32_t>& spinCutoff,
1500 const bool updateSpinCutoff,
1501 T& elem,
1502 ImplByRelocation) noexcept {
1503 try {
1504 elem.~T();
1505 } catch (...) {
1506 // unlikely, but if we don't complete our turn the queue will die
1507 }
1508 sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
1509 memcpy(
1510 static_cast<void*>(&elem),
1511 static_cast<void const*>(&contents_),
1512 sizeof(T));
1513 sequencer_.completeTurn(turn * 2 + 1);
1514 }
1515
1516 /// dequeue by nothrow move assignment.
1517 void dequeueImpl(
1518 uint32_t turn,
1519 Atom<uint32_t>& spinCutoff,
1520 const bool updateSpinCutoff,
1521 T& elem,
1522 ImplByMove) noexcept {
1523 sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
1524 elem = std::move(*ptr());
1525 destroyContents();
1526 sequencer_.completeTurn(turn * 2 + 1);
1527 }
1528};
1529
1530} // namespace detail
1531
1532} // namespace folly
1533