1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <algorithm> |
20 | #include <atomic> |
21 | #include <cassert> |
22 | #include <cstring> |
23 | #include <limits> |
24 | #include <type_traits> |
25 | |
26 | #include <boost/noncopyable.hpp> |
27 | |
28 | #include <folly/Traits.h> |
29 | #include <folly/concurrency/CacheLocality.h> |
30 | #include <folly/detail/TurnSequencer.h> |
31 | #include <folly/portability/Unistd.h> |
32 | |
33 | namespace folly { |
34 | |
35 | namespace detail { |
36 | |
37 | template <typename T, template <typename> class Atom> |
38 | struct SingleElementQueue; |
39 | |
40 | template <typename T> |
41 | class MPMCPipelineStageImpl; |
42 | |
43 | /// MPMCQueue base CRTP template |
44 | template <typename> |
45 | class MPMCQueueBase; |
46 | |
47 | } // namespace detail |
48 | |
49 | /// MPMCQueue<T> is a high-performance bounded concurrent queue that |
50 | /// supports multiple producers, multiple consumers, and optional blocking. |
51 | /// The queue has a fixed capacity, for which all memory will be allocated |
52 | /// up front. The bulk of the work of enqueuing and dequeuing can be |
53 | /// performed in parallel. |
54 | /// |
55 | /// MPMCQueue is linearizable. That means that if a call to write(A) |
56 | /// returns before a call to write(B) begins, then A will definitely end up |
57 | /// in the queue before B, and if a call to read(X) returns before a call |
58 | /// to read(Y) is started, that X will be something from earlier in the |
59 | /// queue than Y. This also means that if a read call returns a value, you |
60 | /// can be sure that all previous elements of the queue have been assigned |
61 | /// a reader (that reader might not yet have returned, but it exists). |
62 | /// |
63 | /// The underlying implementation uses a ticket dispenser for the head and |
64 | /// the tail, spreading accesses across N single-element queues to produce |
65 | /// a queue with capacity N. The ticket dispensers use atomic increment, |
66 | /// which is more robust to contention than a CAS loop. Each of the |
67 | /// single-element queues uses its own CAS to serialize access, with an |
68 | /// adaptive spin cutoff. When spinning fails on a single-element queue |
69 | /// it uses futex()'s _BITSET operations to reduce unnecessary wakeups |
70 | /// even if multiple waiters are present on an individual queue (such as |
71 | /// when the MPMCQueue's capacity is smaller than the number of enqueuers |
72 | /// or dequeuers). |
73 | /// |
74 | /// In benchmarks (contained in tao/queues/ConcurrentQueueTests) |
75 | /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better |
76 | /// than any of the alternatives present in fbcode, for both small (~10) |
77 | /// and large capacities. In these benchmarks it is also faster than |
78 | /// tbb::concurrent_bounded_queue for all configurations. When there are |
79 | /// many more threads than cores, MPMCQueue is _much_ faster than the tbb |
80 | /// queue because it uses futex() to block and unblock waiting threads, |
81 | /// rather than spinning with sched_yield. |
82 | /// |
83 | /// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based |
84 | /// queues separate the assignment of queue positions from the actual |
85 | /// construction of the in-queue elements, which means that the T |
86 | /// constructor used during enqueue must not throw an exception. This is |
87 | /// enforced at compile time using type traits, which requires that T be |
88 | /// adorned with accurate noexcept information. If your type does not |
89 | /// use noexcept, you will have to wrap it in something that provides |
90 | /// the guarantee. We provide an alternate safe implementation for types |
91 | /// that don't use noexcept but that are marked folly::IsRelocatable |
92 | /// and std::is_nothrow_constructible, which is common for folly types. |
93 | /// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE |
94 | /// then your type can be put in MPMCQueue. |
95 | /// |
96 | /// If you have a pool of N queue consumers that you want to shut down |
97 | /// after the queue has drained, one way is to enqueue N sentinel values |
98 | /// to the queue. If the producer doesn't know how many consumers there |
99 | /// are you can enqueue one sentinel and then have each consumer requeue |
100 | /// two sentinels after it receives it (by requeuing 2 the shutdown can |
101 | /// complete in O(log P) time instead of O(P)). |
102 | template < |
103 | typename T, |
104 | template <typename> class Atom = std::atomic, |
105 | bool Dynamic = false> |
106 | class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>> { |
107 | friend class detail::MPMCPipelineStageImpl<T>; |
108 | using Slot = detail::SingleElementQueue<T, Atom>; |
109 | |
110 | public: |
111 | explicit MPMCQueue(size_t queueCapacity) |
112 | : detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>>(queueCapacity) { |
113 | this->stride_ = this->computeStride(queueCapacity); |
114 | this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding]; |
115 | } |
116 | |
117 | MPMCQueue() noexcept {} |
118 | }; |
119 | |
120 | /// The dynamic version of MPMCQueue allows dynamic expansion of queue |
121 | /// capacity, such that a queue may start with a smaller capacity than |
122 | /// specified and expand only if needed. Users may optionally specify |
123 | /// the initial capacity and the expansion multiplier. |
124 | /// |
125 | /// The design uses a seqlock to enforce mutual exclusion among |
126 | /// expansion attempts. Regular operations read up-to-date queue |
127 | /// information (slots array, capacity, stride) inside read-only |
128 | /// seqlock sections, which are unimpeded when no expansion is in |
129 | /// progress. |
130 | /// |
131 | /// An expansion computes a new capacity, allocates a new slots array, |
132 | /// and updates stride. No information needs to be copied from the |
133 | /// current slots array to the new one. When this happens, new slots |
134 | /// will not have sequence numbers that match ticket numbers. The |
135 | /// expansion needs to compute a ticket offset such that operations |
136 | /// that use new arrays can adjust the calculations of slot indexes |
137 | /// and sequence numbers that take into account that the new slots |
138 | /// start with sequence numbers of zero. The current ticket offset is |
139 | /// packed with the seqlock in an atomic 64-bit integer. The initial |
140 | /// offset is zero. |
141 | /// |
142 | /// Lagging write and read operations with tickets lower than the |
143 | /// ticket offset of the current slots array (i.e., the minimum ticket |
144 | /// number that can be served by the current array) must use earlier |
145 | /// closed arrays instead of the current one. Information about closed |
146 | /// slots arrays (array address, capacity, stride, and offset) is |
147 | /// maintained in a logarithmic-sized structure. Each entry in that |
148 | /// structure never needs to be changed once set. The number of closed |
149 | /// arrays is half the value of the seqlock (when unlocked). |
150 | /// |
151 | /// The acquisition of the seqlock to perform an expansion does not |
152 | /// prevent the issuing of new push and pop tickets concurrently. The |
153 | /// expansion must set the new ticket offset to a value that couldn't |
154 | /// have been issued to an operation that has already gone through a |
155 | /// seqlock read-only section (and hence obtained information for |
156 | /// older closed arrays). |
157 | /// |
158 | /// Note that the total queue capacity can temporarily exceed the |
159 | /// specified capacity when there are lagging consumers that haven't |
160 | /// yet consumed all the elements in closed arrays. Users should not |
161 | /// rely on the capacity of dynamic queues for synchronization, e.g., |
162 | /// they should not expect that a thread will definitely block on a |
163 | /// call to blockingWrite() when the queue size is known to be equal |
164 | /// to its capacity. |
165 | /// |
166 | /// Note that some writeIfNotFull() and tryWriteUntil() operations may |
167 | /// fail even if the size of the queue is less than its maximum |
168 | /// capacity and despite the success of expansion, if the operation |
169 | /// happens to acquire a ticket that belongs to a closed array. This |
170 | /// is a transient condition. Typically, one or two ticket values may |
171 | /// be subject to such condition per expansion. |
172 | /// |
173 | /// The dynamic version is a partial specialization of MPMCQueue with |
174 | /// Dynamic == true |
175 | template <typename T, template <typename> class Atom> |
176 | class MPMCQueue<T, Atom, true> |
177 | : public detail::MPMCQueueBase<MPMCQueue<T, Atom, true>> { |
178 | friend class detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>; |
179 | using Slot = detail::SingleElementQueue<T, Atom>; |
180 | |
181 | struct ClosedArray { |
182 | uint64_t offset_{0}; |
183 | Slot* slots_{nullptr}; |
184 | size_t capacity_{0}; |
185 | int stride_{0}; |
186 | }; |
187 | |
188 | public: |
189 | explicit MPMCQueue(size_t queueCapacity) |
190 | : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) { |
191 | size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity); |
192 | initQueue(cap, kDefaultExpansionMultiplier); |
193 | } |
194 | |
195 | explicit MPMCQueue( |
196 | size_t queueCapacity, |
197 | size_t minCapacity, |
198 | size_t expansionMultiplier) |
199 | : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) { |
200 | minCapacity = std::max<size_t>(1, minCapacity); |
201 | size_t cap = std::min<size_t>(minCapacity, queueCapacity); |
202 | expansionMultiplier = std::max<size_t>(2, expansionMultiplier); |
203 | initQueue(cap, expansionMultiplier); |
204 | } |
205 | |
206 | MPMCQueue() noexcept { |
207 | dmult_ = 0; |
208 | closed_ = nullptr; |
209 | } |
210 | |
211 | MPMCQueue(MPMCQueue<T, Atom, true>&& rhs) noexcept { |
212 | this->capacity_ = rhs.capacity_; |
213 | new (&this->dslots_) |
214 | Atom<Slot*>(rhs.dslots_.load(std::memory_order_relaxed)); |
215 | new (&this->dstride_) |
216 | Atom<int>(rhs.dstride_.load(std::memory_order_relaxed)); |
217 | this->dstate_.store( |
218 | rhs.dstate_.load(std::memory_order_relaxed), std::memory_order_relaxed); |
219 | this->dcapacity_.store( |
220 | rhs.dcapacity_.load(std::memory_order_relaxed), |
221 | std::memory_order_relaxed); |
222 | this->pushTicket_.store( |
223 | rhs.pushTicket_.load(std::memory_order_relaxed), |
224 | std::memory_order_relaxed); |
225 | this->popTicket_.store( |
226 | rhs.popTicket_.load(std::memory_order_relaxed), |
227 | std::memory_order_relaxed); |
228 | this->pushSpinCutoff_.store( |
229 | rhs.pushSpinCutoff_.load(std::memory_order_relaxed), |
230 | std::memory_order_relaxed); |
231 | this->popSpinCutoff_.store( |
232 | rhs.popSpinCutoff_.load(std::memory_order_relaxed), |
233 | std::memory_order_relaxed); |
234 | dmult_ = rhs.dmult_; |
235 | closed_ = rhs.closed_; |
236 | |
237 | rhs.capacity_ = 0; |
238 | rhs.dslots_.store(nullptr, std::memory_order_relaxed); |
239 | rhs.dstride_.store(0, std::memory_order_relaxed); |
240 | rhs.dstate_.store(0, std::memory_order_relaxed); |
241 | rhs.dcapacity_.store(0, std::memory_order_relaxed); |
242 | rhs.pushTicket_.store(0, std::memory_order_relaxed); |
243 | rhs.popTicket_.store(0, std::memory_order_relaxed); |
244 | rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); |
245 | rhs.popSpinCutoff_.store(0, std::memory_order_relaxed); |
246 | rhs.dmult_ = 0; |
247 | rhs.closed_ = nullptr; |
248 | } |
249 | |
250 | MPMCQueue<T, Atom, true> const& operator=(MPMCQueue<T, Atom, true>&& rhs) { |
251 | if (this != &rhs) { |
252 | this->~MPMCQueue(); |
253 | new (this) MPMCQueue(std::move(rhs)); |
254 | } |
255 | return *this; |
256 | } |
257 | |
258 | ~MPMCQueue() { |
259 | if (closed_ != nullptr) { |
260 | for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) { |
261 | delete[] closed_[i].slots_; |
262 | } |
263 | delete[] closed_; |
264 | } |
265 | using AtomInt = Atom<int>; |
266 | this->dstride_.~AtomInt(); |
267 | using AtomSlot = Atom<Slot*>; |
268 | // Sort of a hack to get ~MPMCQueueBase to free dslots_ |
269 | auto slots = this->dslots_.load(); |
270 | this->dslots_.~AtomSlot(); |
271 | this->slots_ = slots; |
272 | } |
273 | |
274 | size_t allocatedCapacity() const noexcept { |
275 | return this->dcapacity_.load(std::memory_order_relaxed); |
276 | } |
277 | |
278 | template <typename... Args> |
279 | void blockingWrite(Args&&... args) noexcept { |
280 | uint64_t ticket = this->pushTicket_++; |
281 | Slot* slots; |
282 | size_t cap; |
283 | int stride; |
284 | uint64_t state; |
285 | uint64_t offset; |
286 | do { |
287 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
288 | asm_volatile_pause(); |
289 | continue; |
290 | } |
291 | if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) { |
292 | // There was an expansion after this ticket was issued. |
293 | break; |
294 | } |
295 | if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( |
296 | this->turn(ticket - offset, cap))) { |
297 | // A slot is ready. No need to expand. |
298 | break; |
299 | } else if ( |
300 | this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) { |
301 | // May block, but a pop is in progress. No need to expand. |
302 | // Get seqlock read section info again in case an expansion |
303 | // occurred with an equal or higher ticket. |
304 | continue; |
305 | } else { |
306 | // May block. See if we can expand. |
307 | if (tryExpand(state, cap)) { |
308 | // This or another thread started an expansion. Get updated info. |
309 | continue; |
310 | } else { |
311 | // Can't expand. |
312 | break; |
313 | } |
314 | } |
315 | } while (true); |
316 | this->enqueueWithTicketBase( |
317 | ticket - offset, slots, cap, stride, std::forward<Args>(args)...); |
318 | } |
319 | |
320 | void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { |
321 | ticket = this->popTicket_++; |
322 | Slot* slots; |
323 | size_t cap; |
324 | int stride; |
325 | uint64_t state; |
326 | uint64_t offset; |
327 | while (!trySeqlockReadSection(state, slots, cap, stride)) { |
328 | asm_volatile_pause(); |
329 | } |
330 | // If there was an expansion after the corresponding push ticket |
331 | // was issued, adjust accordingly |
332 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
333 | this->dequeueWithTicketBase(ticket - offset, slots, cap, stride, elem); |
334 | } |
335 | |
336 | private: |
337 | enum { |
338 | kSeqlockBits = 6, |
339 | kDefaultMinDynamicCapacity = 10, |
340 | kDefaultExpansionMultiplier = 10, |
341 | }; |
342 | |
343 | size_t dmult_; |
344 | |
345 | // Info about closed slots arrays for use by lagging operations |
346 | ClosedArray* closed_; |
347 | |
348 | void initQueue(const size_t cap, const size_t mult) { |
349 | new (&this->dstride_) Atom<int>(this->computeStride(cap)); |
350 | Slot* slots = new Slot[cap + 2 * this->kSlotPadding]; |
351 | new (&this->dslots_) Atom<Slot*>(slots); |
352 | this->dstate_.store(0); |
353 | this->dcapacity_.store(cap); |
354 | dmult_ = mult; |
355 | size_t maxClosed = 0; |
356 | for (size_t expanded = cap; expanded < this->capacity_; expanded *= mult) { |
357 | ++maxClosed; |
358 | } |
359 | closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr; |
360 | } |
361 | |
362 | bool tryObtainReadyPushTicket( |
363 | uint64_t& ticket, |
364 | Slot*& slots, |
365 | size_t& cap, |
366 | int& stride) noexcept { |
367 | uint64_t state; |
368 | do { |
369 | ticket = this->pushTicket_.load(std::memory_order_acquire); // A |
370 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
371 | asm_volatile_pause(); |
372 | continue; |
373 | } |
374 | |
375 | // If there was an expansion with offset greater than this ticket, |
376 | // adjust accordingly |
377 | uint64_t offset; |
378 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
379 | |
380 | if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( |
381 | this->turn(ticket - offset, cap))) { |
382 | // A slot is ready. |
383 | if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
384 | // Adjust ticket |
385 | ticket -= offset; |
386 | return true; |
387 | } else { |
388 | continue; |
389 | } |
390 | } else { |
391 | if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B |
392 | // Try again. Ticket changed. |
393 | continue; |
394 | } |
395 | // Likely to block. |
396 | // Try to expand unless the ticket is for a closed array |
397 | if (offset == getOffset(state)) { |
398 | if (tryExpand(state, cap)) { |
399 | // This or another thread started an expansion. Get up-to-date info. |
400 | continue; |
401 | } |
402 | } |
403 | return false; |
404 | } |
405 | } while (true); |
406 | } |
407 | |
408 | bool tryObtainPromisedPushTicket( |
409 | uint64_t& ticket, |
410 | Slot*& slots, |
411 | size_t& cap, |
412 | int& stride) noexcept { |
413 | uint64_t state; |
414 | do { |
415 | ticket = this->pushTicket_.load(std::memory_order_acquire); |
416 | auto numPops = this->popTicket_.load(std::memory_order_acquire); |
417 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
418 | asm_volatile_pause(); |
419 | continue; |
420 | } |
421 | |
422 | const auto curCap = cap; |
423 | // If there was an expansion with offset greater than this ticket, |
424 | // adjust accordingly |
425 | uint64_t offset; |
426 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
427 | |
428 | int64_t n = ticket - numPops; |
429 | |
430 | if (n >= static_cast<ssize_t>(cap)) { |
431 | if ((cap == curCap) && tryExpand(state, cap)) { |
432 | // This or another thread started an expansion. Start over. |
433 | continue; |
434 | } |
435 | // Can't expand. |
436 | ticket -= offset; |
437 | return false; |
438 | } |
439 | |
440 | if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
441 | // Adjust ticket |
442 | ticket -= offset; |
443 | return true; |
444 | } |
445 | } while (true); |
446 | } |
447 | |
448 | bool tryObtainReadyPopTicket( |
449 | uint64_t& ticket, |
450 | Slot*& slots, |
451 | size_t& cap, |
452 | int& stride) noexcept { |
453 | uint64_t state; |
454 | do { |
455 | ticket = this->popTicket_.load(std::memory_order_relaxed); |
456 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
457 | asm_volatile_pause(); |
458 | continue; |
459 | } |
460 | |
461 | // If there was an expansion after the corresponding push ticket |
462 | // was issued, adjust accordingly |
463 | uint64_t offset; |
464 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
465 | |
466 | if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue( |
467 | this->turn(ticket - offset, cap))) { |
468 | if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
469 | // Adjust ticket |
470 | ticket -= offset; |
471 | return true; |
472 | } |
473 | } else { |
474 | return false; |
475 | } |
476 | } while (true); |
477 | } |
478 | |
479 | bool tryObtainPromisedPopTicket( |
480 | uint64_t& ticket, |
481 | Slot*& slots, |
482 | size_t& cap, |
483 | int& stride) noexcept { |
484 | uint64_t state; |
485 | do { |
486 | ticket = this->popTicket_.load(std::memory_order_acquire); |
487 | auto numPushes = this->pushTicket_.load(std::memory_order_acquire); |
488 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
489 | asm_volatile_pause(); |
490 | continue; |
491 | } |
492 | |
493 | uint64_t offset; |
494 | // If there was an expansion after the corresponding push |
495 | // ticket was issued, adjust accordingly |
496 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
497 | |
498 | if (ticket >= numPushes) { |
499 | ticket -= offset; |
500 | return false; |
501 | } |
502 | if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
503 | ticket -= offset; |
504 | return true; |
505 | } |
506 | } while (true); |
507 | } |
508 | |
509 | /// Enqueues an element with a specific ticket number |
510 | template <typename... Args> |
511 | void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept { |
512 | Slot* slots; |
513 | size_t cap; |
514 | int stride; |
515 | uint64_t state; |
516 | uint64_t offset; |
517 | |
518 | while (!trySeqlockReadSection(state, slots, cap, stride)) { |
519 | } |
520 | |
521 | // If there was an expansion after this ticket was issued, adjust |
522 | // accordingly |
523 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
524 | |
525 | this->enqueueWithTicketBase( |
526 | ticket - offset, slots, cap, stride, std::forward<Args>(args)...); |
527 | } |
528 | |
529 | uint64_t getOffset(const uint64_t state) const noexcept { |
530 | return state >> kSeqlockBits; |
531 | } |
532 | |
533 | int getNumClosed(const uint64_t state) const noexcept { |
534 | return (state & ((1 << kSeqlockBits) - 1)) >> 1; |
535 | } |
536 | |
537 | /// Try to expand the queue. Returns true if this expansion was |
538 | /// successful or a concurent expansion is in progress. Returns |
539 | /// false if the queue has reached its maximum capacity or |
540 | /// allocation has failed. |
541 | bool tryExpand(const uint64_t state, const size_t cap) noexcept { |
542 | if (cap == this->capacity_) { |
543 | return false; |
544 | } |
545 | // Acquire seqlock |
546 | uint64_t oldval = state; |
547 | assert((state & 1) == 0); |
548 | if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { |
549 | assert(cap == this->dcapacity_.load()); |
550 | uint64_t ticket = |
551 | 1 + std::max(this->pushTicket_.load(), this->popTicket_.load()); |
552 | size_t newCapacity = std::min(dmult_ * cap, this->capacity_); |
553 | Slot* newSlots = |
554 | new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; |
555 | if (newSlots == nullptr) { |
556 | // Expansion failed. Restore the seqlock |
557 | this->dstate_.store(state); |
558 | return false; |
559 | } |
560 | // Successful expansion |
561 | // calculate the current ticket offset |
562 | uint64_t offset = getOffset(state); |
563 | // calculate index in closed array |
564 | int index = getNumClosed(state); |
565 | assert((index << 1) < (1 << kSeqlockBits)); |
566 | // fill the info for the closed slots array |
567 | closed_[index].offset_ = offset; |
568 | closed_[index].slots_ = this->dslots_.load(); |
569 | closed_[index].capacity_ = cap; |
570 | closed_[index].stride_ = this->dstride_.load(); |
571 | // update the new slots array info |
572 | this->dslots_.store(newSlots); |
573 | this->dcapacity_.store(newCapacity); |
574 | this->dstride_.store(this->computeStride(newCapacity)); |
575 | // Release the seqlock and record the new ticket offset |
576 | this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1))); |
577 | return true; |
578 | } else { // failed to acquire seqlock |
579 | // Someone acaquired the seqlock. Go back to the caller and get |
580 | // up-to-date info. |
581 | return true; |
582 | } |
583 | } |
584 | |
585 | /// Seqlock read-only section |
586 | bool trySeqlockReadSection( |
587 | uint64_t& state, |
588 | Slot*& slots, |
589 | size_t& cap, |
590 | int& stride) noexcept { |
591 | state = this->dstate_.load(std::memory_order_acquire); |
592 | if (state & 1) { |
593 | // Locked. |
594 | return false; |
595 | } |
596 | // Start read-only section. |
597 | slots = this->dslots_.load(std::memory_order_relaxed); |
598 | cap = this->dcapacity_.load(std::memory_order_relaxed); |
599 | stride = this->dstride_.load(std::memory_order_relaxed); |
600 | // End of read-only section. Validate seqlock. |
601 | std::atomic_thread_fence(std::memory_order_acquire); |
602 | return (state == this->dstate_.load(std::memory_order_relaxed)); |
603 | } |
604 | |
605 | /// If there was an expansion after ticket was issued, update local variables |
606 | /// of the lagging operation using the most recent closed array with |
607 | /// offset <= ticket and return true. Otherwise, return false; |
608 | bool maybeUpdateFromClosed( |
609 | const uint64_t state, |
610 | const uint64_t ticket, |
611 | uint64_t& offset, |
612 | Slot*& slots, |
613 | size_t& cap, |
614 | int& stride) noexcept { |
615 | offset = getOffset(state); |
616 | if (ticket >= offset) { |
617 | return false; |
618 | } |
619 | for (int i = getNumClosed(state) - 1; i >= 0; --i) { |
620 | offset = closed_[i].offset_; |
621 | if (offset <= ticket) { |
622 | slots = closed_[i].slots_; |
623 | cap = closed_[i].capacity_; |
624 | stride = closed_[i].stride_; |
625 | return true; |
626 | } |
627 | } |
628 | // A closed array with offset <= ticket should have been found |
629 | assert(false); |
630 | return false; |
631 | } |
632 | }; |
633 | |
634 | namespace detail { |
635 | |
636 | /// CRTP specialization of MPMCQueueBase |
637 | template < |
638 | template <typename T, template <typename> class Atom, bool Dynamic> |
639 | class Derived, |
640 | typename T, |
641 | template <typename> class Atom, |
642 | bool Dynamic> |
643 | class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable { |
644 | // Note: Using CRTP static casts in several functions of this base |
645 | // template instead of making called functions virtual or duplicating |
646 | // the code of calling functions in the derived partially specialized |
647 | // template |
648 | |
649 | static_assert( |
650 | std::is_nothrow_constructible<T, T&&>::value || |
651 | folly::IsRelocatable<T>::value, |
652 | "T must be relocatable or have a noexcept move constructor" ); |
653 | |
654 | public: |
655 | typedef T value_type; |
656 | |
657 | using Slot = detail::SingleElementQueue<T, Atom>; |
658 | |
659 | explicit MPMCQueueBase(size_t queueCapacity) |
660 | : capacity_(queueCapacity), |
661 | dstate_(0), |
662 | dcapacity_(0), |
663 | pushTicket_(0), |
664 | popTicket_(0), |
665 | pushSpinCutoff_(0), |
666 | popSpinCutoff_(0) { |
667 | if (queueCapacity == 0) { |
668 | throw std::invalid_argument( |
669 | "MPMCQueue with explicit capacity 0 is impossible" |
670 | // Stride computation in derived classes would sigfpe if capacity is 0 |
671 | ); |
672 | } |
673 | |
674 | // ideally this would be a static assert, but g++ doesn't allow it |
675 | assert( |
676 | alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size); |
677 | assert( |
678 | static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) - |
679 | static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >= |
680 | static_cast<ptrdiff_t>(hardware_destructive_interference_size)); |
681 | } |
682 | |
683 | /// A default-constructed queue is useful because a usable (non-zero |
684 | /// capacity) queue can be moved onto it or swapped with it |
685 | MPMCQueueBase() noexcept |
686 | : capacity_(0), |
687 | slots_(nullptr), |
688 | stride_(0), |
689 | dstate_(0), |
690 | dcapacity_(0), |
691 | pushTicket_(0), |
692 | popTicket_(0), |
693 | pushSpinCutoff_(0), |
694 | popSpinCutoff_(0) {} |
695 | |
696 | /// IMPORTANT: The move constructor is here to make it easier to perform |
697 | /// the initialization phase, it is not safe to use when there are any |
698 | /// concurrent accesses (this is not checked). |
699 | MPMCQueueBase(MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) noexcept |
700 | : capacity_(rhs.capacity_), |
701 | slots_(rhs.slots_), |
702 | stride_(rhs.stride_), |
703 | dstate_(rhs.dstate_.load(std::memory_order_relaxed)), |
704 | dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)), |
705 | pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)), |
706 | popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)), |
707 | pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)), |
708 | popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed)) { |
709 | // relaxed ops are okay for the previous reads, since rhs queue can't |
710 | // be in concurrent use |
711 | |
712 | // zero out rhs |
713 | rhs.capacity_ = 0; |
714 | rhs.slots_ = nullptr; |
715 | rhs.stride_ = 0; |
716 | rhs.dstate_.store(0, std::memory_order_relaxed); |
717 | rhs.dcapacity_.store(0, std::memory_order_relaxed); |
718 | rhs.pushTicket_.store(0, std::memory_order_relaxed); |
719 | rhs.popTicket_.store(0, std::memory_order_relaxed); |
720 | rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); |
721 | rhs.popSpinCutoff_.store(0, std::memory_order_relaxed); |
722 | } |
723 | |
724 | /// IMPORTANT: The move operator is here to make it easier to perform |
725 | /// the initialization phase, it is not safe to use when there are any |
726 | /// concurrent accesses (this is not checked). |
727 | MPMCQueueBase<Derived<T, Atom, Dynamic>> const& operator=( |
728 | MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) { |
729 | if (this != &rhs) { |
730 | this->~MPMCQueueBase(); |
731 | new (this) MPMCQueueBase(std::move(rhs)); |
732 | } |
733 | return *this; |
734 | } |
735 | |
736 | /// MPMCQueue can only be safely destroyed when there are no |
737 | /// pending enqueuers or dequeuers (this is not checked). |
738 | ~MPMCQueueBase() { |
739 | delete[] slots_; |
740 | } |
741 | |
742 | /// Returns the number of writes (including threads that are blocked waiting |
743 | /// to write) minus the number of reads (including threads that are blocked |
744 | /// waiting to read). So effectively, it becomes: |
745 | /// elements in queue + pending(calls to write) - pending(calls to read). |
746 | /// If nothing is pending, then the method returns the actual number of |
747 | /// elements in the queue. |
748 | /// The returned value can be negative if there are no writers and the queue |
749 | /// is empty, but there is one reader that is blocked waiting to read (in |
750 | /// which case, the returned size will be -1). |
751 | ssize_t size() const noexcept { |
752 | // since both pushes and pops increase monotonically, we can get a |
753 | // consistent snapshot either by bracketing a read of popTicket_ with |
754 | // two reads of pushTicket_ that return the same value, or the other |
755 | // way around. We maximize our chances by alternately attempting |
756 | // both bracketings. |
757 | uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A |
758 | uint64_t pops = popTicket_.load(std::memory_order_acquire); // B |
759 | while (true) { |
760 | uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C |
761 | if (pushes == nextPushes) { |
762 | // pushTicket_ didn't change from A (or the previous C) to C, |
763 | // so we can linearize at B (or D) |
764 | return ssize_t(pushes - pops); |
765 | } |
766 | pushes = nextPushes; |
767 | uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D |
768 | if (pops == nextPops) { |
769 | // popTicket_ didn't chance from B (or the previous D), so we |
770 | // can linearize at C |
771 | return ssize_t(pushes - pops); |
772 | } |
773 | pops = nextPops; |
774 | } |
775 | } |
776 | |
777 | /// Returns true if there are no items available for dequeue |
778 | bool isEmpty() const noexcept { |
779 | return size() <= 0; |
780 | } |
781 | |
782 | /// Returns true if there is currently no empty space to enqueue |
783 | bool isFull() const noexcept { |
784 | // careful with signed -> unsigned promotion, since size can be negative |
785 | return size() >= static_cast<ssize_t>(capacity_); |
786 | } |
787 | |
788 | /// Returns is a guess at size() for contexts that don't need a precise |
789 | /// value, such as stats. More specifically, it returns the number of writes |
790 | /// minus the number of reads, but after reading the number of writes, more |
791 | /// writers could have came before the number of reads was sampled, |
792 | /// and this method doesn't protect against such case. |
793 | /// The returned value can be negative. |
794 | ssize_t sizeGuess() const noexcept { |
795 | return writeCount() - readCount(); |
796 | } |
797 | |
798 | /// Doesn't change |
799 | size_t capacity() const noexcept { |
800 | return capacity_; |
801 | } |
802 | |
803 | /// Doesn't change for non-dynamic |
804 | size_t allocatedCapacity() const noexcept { |
805 | return capacity_; |
806 | } |
807 | |
808 | /// Returns the total number of calls to blockingWrite or successful |
809 | /// calls to write, including those blockingWrite calls that are |
810 | /// currently blocking |
811 | uint64_t writeCount() const noexcept { |
812 | return pushTicket_.load(std::memory_order_acquire); |
813 | } |
814 | |
815 | /// Returns the total number of calls to blockingRead or successful |
816 | /// calls to read, including those blockingRead calls that are currently |
817 | /// blocking |
818 | uint64_t readCount() const noexcept { |
819 | return popTicket_.load(std::memory_order_acquire); |
820 | } |
821 | |
822 | /// Enqueues a T constructed from args, blocking until space is |
823 | /// available. Note that this method signature allows enqueue via |
824 | /// move, if args is a T rvalue, via copy, if args is a T lvalue, or |
825 | /// via emplacement if args is an initializer list that can be passed |
826 | /// to a T constructor. |
827 | template <typename... Args> |
828 | void blockingWrite(Args&&... args) noexcept { |
829 | enqueueWithTicketBase( |
830 | pushTicket_++, slots_, capacity_, stride_, std::forward<Args>(args)...); |
831 | } |
832 | |
833 | /// If an item can be enqueued with no blocking, does so and returns |
834 | /// true, otherwise returns false. This method is similar to |
835 | /// writeIfNotFull, but if you don't have a specific need for that |
836 | /// method you should use this one. |
837 | /// |
838 | /// One of the common usages of this method is to enqueue via the |
839 | /// move constructor, something like q.write(std::move(x)). If write |
840 | /// returns false because the queue is full then x has not actually been |
841 | /// consumed, which looks strange. To understand why it is actually okay |
842 | /// to use x afterward, remember that std::move is just a typecast that |
843 | /// provides an rvalue reference that enables use of a move constructor |
844 | /// or operator. std::move doesn't actually move anything. It could |
845 | /// more accurately be called std::rvalue_cast or std::move_permission. |
846 | template <typename... Args> |
847 | bool write(Args&&... args) noexcept { |
848 | uint64_t ticket; |
849 | Slot* slots; |
850 | size_t cap; |
851 | int stride; |
852 | if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPushTicket( |
853 | ticket, slots, cap, stride)) { |
854 | // we have pre-validated that the ticket won't block |
855 | enqueueWithTicketBase( |
856 | ticket, slots, cap, stride, std::forward<Args>(args)...); |
857 | return true; |
858 | } else { |
859 | return false; |
860 | } |
861 | } |
862 | |
863 | template <class Clock, typename... Args> |
864 | bool tryWriteUntil( |
865 | const std::chrono::time_point<Clock>& when, |
866 | Args&&... args) noexcept { |
867 | uint64_t ticket; |
868 | Slot* slots; |
869 | size_t cap; |
870 | int stride; |
871 | if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) { |
872 | // we have pre-validated that the ticket won't block, or rather that |
873 | // it won't block longer than it takes another thread to dequeue an |
874 | // element from the slot it identifies. |
875 | enqueueWithTicketBase( |
876 | ticket, slots, cap, stride, std::forward<Args>(args)...); |
877 | return true; |
878 | } else { |
879 | return false; |
880 | } |
881 | } |
882 | |
883 | /// If the queue is not full, enqueues and returns true, otherwise |
884 | /// returns false. Unlike write this method can be blocked by another |
885 | /// thread, specifically a read that has linearized (been assigned |
886 | /// a ticket) but not yet completed. If you don't really need this |
887 | /// function you should probably use write. |
888 | /// |
889 | /// MPMCQueue isn't lock-free, so just because a read operation has |
890 | /// linearized (and isFull is false) doesn't mean that space has been |
891 | /// made available for another write. In this situation write will |
892 | /// return false, but writeIfNotFull will wait for the dequeue to finish. |
893 | /// This method is required if you are composing queues and managing |
894 | /// your own wakeup, because it guarantees that after every successful |
895 | /// write a readIfNotEmpty will succeed. |
896 | template <typename... Args> |
897 | bool writeIfNotFull(Args&&... args) noexcept { |
898 | uint64_t ticket; |
899 | Slot* slots; |
900 | size_t cap; |
901 | int stride; |
902 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
903 | ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { |
904 | // some other thread is already dequeuing the slot into which we |
905 | // are going to enqueue, but we might have to wait for them to finish |
906 | enqueueWithTicketBase( |
907 | ticket, slots, cap, stride, std::forward<Args>(args)...); |
908 | return true; |
909 | } else { |
910 | return false; |
911 | } |
912 | } |
913 | |
914 | /// Moves a dequeued element onto elem, blocking until an element |
915 | /// is available |
916 | void blockingRead(T& elem) noexcept { |
917 | uint64_t ticket; |
918 | static_cast<Derived<T, Atom, Dynamic>*>(this)->blockingReadWithTicket( |
919 | ticket, elem); |
920 | } |
921 | |
922 | /// Same as blockingRead() but also records the ticket nunmer |
923 | void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { |
924 | assert(capacity_ != 0); |
925 | ticket = popTicket_++; |
926 | dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem); |
927 | } |
928 | |
929 | /// If an item can be dequeued with no blocking, does so and returns |
930 | /// true, otherwise returns false. |
931 | bool read(T& elem) noexcept { |
932 | uint64_t ticket; |
933 | return readAndGetTicket(ticket, elem); |
934 | } |
935 | |
936 | /// Same as read() but also records the ticket nunmer |
937 | bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept { |
938 | Slot* slots; |
939 | size_t cap; |
940 | int stride; |
941 | if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPopTicket( |
942 | ticket, slots, cap, stride)) { |
943 | // the ticket has been pre-validated to not block |
944 | dequeueWithTicketBase(ticket, slots, cap, stride, elem); |
945 | return true; |
946 | } else { |
947 | return false; |
948 | } |
949 | } |
950 | |
951 | template <class Clock, typename... Args> |
952 | bool tryReadUntil( |
953 | const std::chrono::time_point<Clock>& when, |
954 | T& elem) noexcept { |
955 | uint64_t ticket; |
956 | Slot* slots; |
957 | size_t cap; |
958 | int stride; |
959 | if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) { |
960 | // we have pre-validated that the ticket won't block, or rather that |
961 | // it won't block longer than it takes another thread to enqueue an |
962 | // element on the slot it identifies. |
963 | dequeueWithTicketBase(ticket, slots, cap, stride, elem); |
964 | return true; |
965 | } else { |
966 | return false; |
967 | } |
968 | } |
969 | |
970 | /// If the queue is not empty, dequeues and returns true, otherwise |
971 | /// returns false. If the matching write is still in progress then this |
972 | /// method may block waiting for it. If you don't rely on being able |
973 | /// to dequeue (such as by counting completed write) then you should |
974 | /// prefer read. |
975 | bool readIfNotEmpty(T& elem) noexcept { |
976 | uint64_t ticket; |
977 | Slot* slots; |
978 | size_t cap; |
979 | int stride; |
980 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
981 | ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) { |
982 | // the matching enqueue already has a ticket, but might not be done |
983 | dequeueWithTicketBase(ticket, slots, cap, stride, elem); |
984 | return true; |
985 | } else { |
986 | return false; |
987 | } |
988 | } |
989 | |
990 | protected: |
991 | enum { |
992 | /// Once every kAdaptationFreq we will spin longer, to try to estimate |
993 | /// the proper spin backoff |
994 | kAdaptationFreq = 128, |
995 | |
996 | /// To avoid false sharing in slots_ with neighboring memory |
997 | /// allocations, we pad it with this many SingleElementQueue-s at |
998 | /// each end |
999 | kSlotPadding = |
1000 | (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1 |
1001 | }; |
1002 | |
1003 | /// The maximum number of items in the queue at once |
1004 | alignas(hardware_destructive_interference_size) size_t capacity_; |
1005 | |
1006 | /// Anonymous union for use when Dynamic = false and true, respectively |
1007 | union { |
1008 | /// An array of capacity_ SingleElementQueue-s, each of which holds |
1009 | /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't |
1010 | /// touch the slots at either end, to avoid false sharing |
1011 | Slot* slots_; |
1012 | /// Current dynamic slots array of dcapacity_ SingleElementQueue-s |
1013 | Atom<Slot*> dslots_; |
1014 | }; |
1015 | |
1016 | /// Anonymous union for use when Dynamic = false and true, respectively |
1017 | union { |
1018 | /// The number of slots_ indices that we advance for each ticket, to |
1019 | /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_] |
1020 | /// aren't on the same cache line |
1021 | int stride_; |
1022 | /// Current stride |
1023 | Atom<int> dstride_; |
1024 | }; |
1025 | |
1026 | /// The following two memebers are used by dynamic MPMCQueue. |
1027 | /// Ideally they should be in MPMCQueue<T,Atom,true>, but we get |
1028 | /// better cache locality if they are in the same cache line as |
1029 | /// dslots_ and dstride_. |
1030 | /// |
1031 | /// Dynamic state. A packed seqlock and ticket offset |
1032 | Atom<uint64_t> dstate_; |
1033 | /// Dynamic capacity |
1034 | Atom<size_t> dcapacity_; |
1035 | |
1036 | /// Enqueuers get tickets from here |
1037 | alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_; |
1038 | |
1039 | /// Dequeuers get tickets from here |
1040 | alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_; |
1041 | |
1042 | /// This is how many times we will spin before using FUTEX_WAIT when |
1043 | /// the queue is full on enqueue, adaptively computed by occasionally |
1044 | /// spinning for longer and smoothing with an exponential moving average |
1045 | alignas( |
1046 | hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_; |
1047 | |
1048 | /// The adaptive spin cutoff when the queue is empty on dequeue |
1049 | alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_; |
1050 | |
1051 | /// Alignment doesn't prevent false sharing at the end of the struct, |
1052 | /// so fill out the last cache line |
1053 | char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)]; |
1054 | |
1055 | /// We assign tickets in increasing order, but we don't want to |
1056 | /// access neighboring elements of slots_ because that will lead to |
1057 | /// false sharing (multiple cores accessing the same cache line even |
1058 | /// though they aren't accessing the same bytes in that cache line). |
1059 | /// To avoid this we advance by stride slots per ticket. |
1060 | /// |
1061 | /// We need gcd(capacity, stride) to be 1 so that we will use all |
1062 | /// of the slots. We ensure this by only considering prime strides, |
1063 | /// which either have no common divisors with capacity or else have |
1064 | /// a zero remainder after dividing by capacity. That is sufficient |
1065 | /// to guarantee correctness, but we also want to actually spread the |
1066 | /// accesses away from each other to avoid false sharing (consider a |
1067 | /// stride of 7 with a capacity of 8). To that end we try a few taking |
1068 | /// care to observe that advancing by -1 is as bad as advancing by 1 |
1069 | /// when in comes to false sharing. |
1070 | /// |
1071 | /// The simple way to avoid false sharing would be to pad each |
1072 | /// SingleElementQueue, but since we have capacity_ of them that could |
1073 | /// waste a lot of space. |
1074 | static int computeStride(size_t capacity) noexcept { |
1075 | static const int smallPrimes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23}; |
1076 | |
1077 | int bestStride = 1; |
1078 | size_t bestSep = 1; |
1079 | for (int stride : smallPrimes) { |
1080 | if ((stride % capacity) == 0 || (capacity % stride) == 0) { |
1081 | continue; |
1082 | } |
1083 | size_t sep = stride % capacity; |
1084 | sep = std::min(sep, capacity - sep); |
1085 | if (sep > bestSep) { |
1086 | bestStride = stride; |
1087 | bestSep = sep; |
1088 | } |
1089 | } |
1090 | return bestStride; |
1091 | } |
1092 | |
1093 | /// Returns the index into slots_ that should be used when enqueuing or |
1094 | /// dequeuing with the specified ticket |
1095 | size_t idx(uint64_t ticket, size_t cap, int stride) noexcept { |
1096 | return ((ticket * stride) % cap) + kSlotPadding; |
1097 | } |
1098 | |
1099 | /// Maps an enqueue or dequeue ticket to the turn should be used at the |
1100 | /// corresponding SingleElementQueue |
1101 | uint32_t turn(uint64_t ticket, size_t cap) noexcept { |
1102 | assert(cap != 0); |
1103 | return uint32_t(ticket / cap); |
1104 | } |
1105 | |
1106 | /// Tries to obtain a push ticket for which SingleElementQueue::enqueue |
1107 | /// won't block. Returns true on immediate success, false on immediate |
1108 | /// failure. |
1109 | bool tryObtainReadyPushTicket( |
1110 | uint64_t& ticket, |
1111 | Slot*& slots, |
1112 | size_t& cap, |
1113 | int& stride) noexcept { |
1114 | ticket = pushTicket_.load(std::memory_order_acquire); // A |
1115 | slots = slots_; |
1116 | cap = capacity_; |
1117 | stride = stride_; |
1118 | while (true) { |
1119 | if (!slots[idx(ticket, cap, stride)].mayEnqueue(turn(ticket, cap))) { |
1120 | // if we call enqueue(ticket, ...) on the SingleElementQueue |
1121 | // right now it would block, but this might no longer be the next |
1122 | // ticket. We can increase the chance of tryEnqueue success under |
1123 | // contention (without blocking) by rechecking the ticket dispenser |
1124 | auto prev = ticket; |
1125 | ticket = pushTicket_.load(std::memory_order_acquire); // B |
1126 | if (prev == ticket) { |
1127 | // mayEnqueue was bracketed by two reads (A or prev B or prev |
1128 | // failing CAS to B), so we are definitely unable to enqueue |
1129 | return false; |
1130 | } |
1131 | } else { |
1132 | // we will bracket the mayEnqueue check with a read (A or prev B |
1133 | // or prev failing CAS) and the following CAS. If the CAS fails |
1134 | // it will effect a load of pushTicket_ |
1135 | if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
1136 | return true; |
1137 | } |
1138 | } |
1139 | } |
1140 | } |
1141 | |
1142 | /// Tries until when to obtain a push ticket for which |
1143 | /// SingleElementQueue::enqueue won't block. Returns true on success, false |
1144 | /// on failure. |
1145 | /// ticket is filled on success AND failure. |
1146 | template <class Clock> |
1147 | bool tryObtainPromisedPushTicketUntil( |
1148 | uint64_t& ticket, |
1149 | Slot*& slots, |
1150 | size_t& cap, |
1151 | int& stride, |
1152 | const std::chrono::time_point<Clock>& when) noexcept { |
1153 | bool deadlineReached = false; |
1154 | while (!deadlineReached) { |
1155 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
1156 | ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { |
1157 | return true; |
1158 | } |
1159 | // ticket is a blocking ticket until the preceding ticket has been |
1160 | // processed: wait until this ticket's turn arrives. We have not reserved |
1161 | // this ticket so we will have to re-attempt to get a non-blocking ticket |
1162 | // if we wake up before we time-out. |
1163 | deadlineReached = |
1164 | !slots[idx(ticket, cap, stride)].tryWaitForEnqueueTurnUntil( |
1165 | turn(ticket, cap), |
1166 | pushSpinCutoff_, |
1167 | (ticket % kAdaptationFreq) == 0, |
1168 | when); |
1169 | } |
1170 | return false; |
1171 | } |
1172 | |
1173 | /// Tries to obtain a push ticket which can be satisfied if all |
1174 | /// in-progress pops complete. This function does not block, but |
1175 | /// blocking may be required when using the returned ticket if some |
1176 | /// other thread's pop is still in progress (ticket has been granted but |
1177 | /// pop has not yet completed). |
1178 | bool tryObtainPromisedPushTicket( |
1179 | uint64_t& ticket, |
1180 | Slot*& slots, |
1181 | size_t& cap, |
1182 | int& stride) noexcept { |
1183 | auto numPushes = pushTicket_.load(std::memory_order_acquire); // A |
1184 | slots = slots_; |
1185 | cap = capacity_; |
1186 | stride = stride_; |
1187 | while (true) { |
1188 | ticket = numPushes; |
1189 | const auto numPops = popTicket_.load(std::memory_order_acquire); // B |
1190 | // n will be negative if pops are pending |
1191 | const int64_t n = int64_t(numPushes - numPops); |
1192 | if (n >= static_cast<ssize_t>(capacity_)) { |
1193 | // Full, linearize at B. We don't need to recheck the read we |
1194 | // performed at A, because if numPushes was stale at B then the |
1195 | // real numPushes value is even worse |
1196 | return false; |
1197 | } |
1198 | if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) { |
1199 | return true; |
1200 | } |
1201 | } |
1202 | } |
1203 | |
1204 | /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue |
1205 | /// won't block. Returns true on immediate success, false on immediate |
1206 | /// failure. |
1207 | bool tryObtainReadyPopTicket( |
1208 | uint64_t& ticket, |
1209 | Slot*& slots, |
1210 | size_t& cap, |
1211 | int& stride) noexcept { |
1212 | ticket = popTicket_.load(std::memory_order_acquire); |
1213 | slots = slots_; |
1214 | cap = capacity_; |
1215 | stride = stride_; |
1216 | while (true) { |
1217 | if (!slots[idx(ticket, cap, stride)].mayDequeue(turn(ticket, cap))) { |
1218 | auto prev = ticket; |
1219 | ticket = popTicket_.load(std::memory_order_acquire); |
1220 | if (prev == ticket) { |
1221 | return false; |
1222 | } |
1223 | } else { |
1224 | if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
1225 | return true; |
1226 | } |
1227 | } |
1228 | } |
1229 | } |
1230 | |
1231 | /// Tries until when to obtain a pop ticket for which |
1232 | /// SingleElementQueue::dequeue won't block. Returns true on success, false |
1233 | /// on failure. |
1234 | /// ticket is filled on success AND failure. |
1235 | template <class Clock> |
1236 | bool tryObtainPromisedPopTicketUntil( |
1237 | uint64_t& ticket, |
1238 | Slot*& slots, |
1239 | size_t& cap, |
1240 | int& stride, |
1241 | const std::chrono::time_point<Clock>& when) noexcept { |
1242 | bool deadlineReached = false; |
1243 | while (!deadlineReached) { |
1244 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
1245 | ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) { |
1246 | return true; |
1247 | } |
1248 | // ticket is a blocking ticket until the preceding ticket has been |
1249 | // processed: wait until this ticket's turn arrives. We have not reserved |
1250 | // this ticket so we will have to re-attempt to get a non-blocking ticket |
1251 | // if we wake up before we time-out. |
1252 | deadlineReached = |
1253 | !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil( |
1254 | turn(ticket, cap), |
1255 | pushSpinCutoff_, |
1256 | (ticket % kAdaptationFreq) == 0, |
1257 | when); |
1258 | } |
1259 | return false; |
1260 | } |
1261 | |
1262 | /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose |
1263 | /// corresponding push ticket has already been handed out, rather than |
1264 | /// returning one whose corresponding push ticket has already been |
1265 | /// completed. This means that there is a possibility that the caller |
1266 | /// will block when using the ticket, but it allows the user to rely on |
1267 | /// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket |
1268 | /// will return true. The "try" part of this is that we won't have |
1269 | /// to block waiting for someone to call enqueue, although we might |
1270 | /// have to block waiting for them to finish executing code inside the |
1271 | /// MPMCQueue itself. |
1272 | bool tryObtainPromisedPopTicket( |
1273 | uint64_t& ticket, |
1274 | Slot*& slots, |
1275 | size_t& cap, |
1276 | int& stride) noexcept { |
1277 | auto numPops = popTicket_.load(std::memory_order_acquire); // A |
1278 | slots = slots_; |
1279 | cap = capacity_; |
1280 | stride = stride_; |
1281 | while (true) { |
1282 | ticket = numPops; |
1283 | const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B |
1284 | if (numPops >= numPushes) { |
1285 | // Empty, or empty with pending pops. Linearize at B. We don't |
1286 | // need to recheck the read we performed at A, because if numPops |
1287 | // is stale then the fresh value is larger and the >= is still true |
1288 | return false; |
1289 | } |
1290 | if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) { |
1291 | return true; |
1292 | } |
1293 | } |
1294 | } |
1295 | |
1296 | // Given a ticket, constructs an enqueued item using args |
1297 | template <typename... Args> |
1298 | void enqueueWithTicketBase( |
1299 | uint64_t ticket, |
1300 | Slot* slots, |
1301 | size_t cap, |
1302 | int stride, |
1303 | Args&&... args) noexcept { |
1304 | slots[idx(ticket, cap, stride)].enqueue( |
1305 | turn(ticket, cap), |
1306 | pushSpinCutoff_, |
1307 | (ticket % kAdaptationFreq) == 0, |
1308 | std::forward<Args>(args)...); |
1309 | } |
1310 | |
1311 | // To support tracking ticket numbers in MPMCPipelineStageImpl |
1312 | template <typename... Args> |
1313 | void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept { |
1314 | enqueueWithTicketBase( |
1315 | ticket, slots_, capacity_, stride_, std::forward<Args>(args)...); |
1316 | } |
1317 | |
1318 | // Given a ticket, dequeues the corresponding element |
1319 | void dequeueWithTicketBase( |
1320 | uint64_t ticket, |
1321 | Slot* slots, |
1322 | size_t cap, |
1323 | int stride, |
1324 | T& elem) noexcept { |
1325 | assert(cap != 0); |
1326 | slots[idx(ticket, cap, stride)].dequeue( |
1327 | turn(ticket, cap), |
1328 | popSpinCutoff_, |
1329 | (ticket % kAdaptationFreq) == 0, |
1330 | elem); |
1331 | } |
1332 | }; |
1333 | |
1334 | /// SingleElementQueue implements a blocking queue that holds at most one |
1335 | /// item, and that requires its users to assign incrementing identifiers |
1336 | /// (turns) to each enqueue and dequeue operation. Note that the turns |
1337 | /// used by SingleElementQueue are doubled inside the TurnSequencer |
1338 | template <typename T, template <typename> class Atom> |
1339 | struct SingleElementQueue { |
1340 | ~SingleElementQueue() noexcept { |
1341 | if ((sequencer_.uncompletedTurnLSB() & 1) == 1) { |
1342 | // we are pending a dequeue, so we have a constructed item |
1343 | destroyContents(); |
1344 | } |
1345 | } |
1346 | |
1347 | /// enqueue using in-place noexcept construction |
1348 | template < |
1349 | typename... Args, |
1350 | typename = typename std::enable_if< |
1351 | std::is_nothrow_constructible<T, Args...>::value>::type> |
1352 | void enqueue( |
1353 | const uint32_t turn, |
1354 | Atom<uint32_t>& spinCutoff, |
1355 | const bool updateSpinCutoff, |
1356 | Args&&... args) noexcept { |
1357 | sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); |
1358 | new (&contents_) T(std::forward<Args>(args)...); |
1359 | sequencer_.completeTurn(turn * 2); |
1360 | } |
1361 | |
1362 | /// enqueue using move construction, either real (if |
1363 | /// is_nothrow_move_constructible) or simulated using relocation and |
1364 | /// default construction (if IsRelocatable and is_nothrow_constructible) |
1365 | template < |
1366 | typename = typename std::enable_if< |
1367 | (folly::IsRelocatable<T>::value && |
1368 | std::is_nothrow_constructible<T>::value) || |
1369 | std::is_nothrow_constructible<T, T&&>::value>::type> |
1370 | void enqueue( |
1371 | const uint32_t turn, |
1372 | Atom<uint32_t>& spinCutoff, |
1373 | const bool updateSpinCutoff, |
1374 | T&& goner) noexcept { |
1375 | enqueueImpl( |
1376 | turn, |
1377 | spinCutoff, |
1378 | updateSpinCutoff, |
1379 | std::move(goner), |
1380 | typename std::conditional< |
1381 | std::is_nothrow_constructible<T, T&&>::value, |
1382 | ImplByMove, |
1383 | ImplByRelocation>::type()); |
1384 | } |
1385 | |
1386 | /// Waits until either: |
1387 | /// 1: the dequeue turn preceding the given enqueue turn has arrived |
1388 | /// 2: the given deadline has arrived |
1389 | /// Case 1 returns true, case 2 returns false. |
1390 | template <class Clock> |
1391 | bool tryWaitForEnqueueTurnUntil( |
1392 | const uint32_t turn, |
1393 | Atom<uint32_t>& spinCutoff, |
1394 | const bool updateSpinCutoff, |
1395 | const std::chrono::time_point<Clock>& when) noexcept { |
1396 | return sequencer_.tryWaitForTurn( |
1397 | turn * 2, spinCutoff, updateSpinCutoff, &when) != |
1398 | TurnSequencer<Atom>::TryWaitResult::TIMEDOUT; |
1399 | } |
1400 | |
1401 | bool mayEnqueue(const uint32_t turn) const noexcept { |
1402 | return sequencer_.isTurn(turn * 2); |
1403 | } |
1404 | |
1405 | void dequeue( |
1406 | uint32_t turn, |
1407 | Atom<uint32_t>& spinCutoff, |
1408 | const bool updateSpinCutoff, |
1409 | T& elem) noexcept { |
1410 | dequeueImpl( |
1411 | turn, |
1412 | spinCutoff, |
1413 | updateSpinCutoff, |
1414 | elem, |
1415 | typename std::conditional< |
1416 | folly::IsRelocatable<T>::value, |
1417 | ImplByRelocation, |
1418 | ImplByMove>::type()); |
1419 | } |
1420 | |
1421 | /// Waits until either: |
1422 | /// 1: the enqueue turn preceding the given dequeue turn has arrived |
1423 | /// 2: the given deadline has arrived |
1424 | /// Case 1 returns true, case 2 returns false. |
1425 | template <class Clock> |
1426 | bool tryWaitForDequeueTurnUntil( |
1427 | const uint32_t turn, |
1428 | Atom<uint32_t>& spinCutoff, |
1429 | const bool updateSpinCutoff, |
1430 | const std::chrono::time_point<Clock>& when) noexcept { |
1431 | return sequencer_.tryWaitForTurn( |
1432 | turn * 2 + 1, spinCutoff, updateSpinCutoff, &when) != |
1433 | TurnSequencer<Atom>::TryWaitResult::TIMEDOUT; |
1434 | } |
1435 | |
1436 | bool mayDequeue(const uint32_t turn) const noexcept { |
1437 | return sequencer_.isTurn(turn * 2 + 1); |
1438 | } |
1439 | |
1440 | private: |
1441 | /// Storage for a T constructed with placement new |
1442 | aligned_storage_for_t<T> contents_; |
1443 | |
1444 | /// Even turns are pushes, odd turns are pops |
1445 | TurnSequencer<Atom> sequencer_; |
1446 | |
1447 | T* ptr() noexcept { |
1448 | return static_cast<T*>(static_cast<void*>(&contents_)); |
1449 | } |
1450 | |
1451 | void destroyContents() noexcept { |
1452 | try { |
1453 | ptr()->~T(); |
1454 | } catch (...) { |
1455 | // g++ doesn't seem to have std::is_nothrow_destructible yet |
1456 | } |
1457 | #ifndef NDEBUG |
1458 | memset(&contents_, 'Q', sizeof(T)); |
1459 | #endif |
1460 | } |
1461 | |
1462 | /// Tag classes for dispatching to enqueue/dequeue implementation. |
1463 | struct ImplByRelocation {}; |
1464 | struct ImplByMove {}; |
1465 | |
1466 | /// enqueue using nothrow move construction. |
1467 | void enqueueImpl( |
1468 | const uint32_t turn, |
1469 | Atom<uint32_t>& spinCutoff, |
1470 | const bool updateSpinCutoff, |
1471 | T&& goner, |
1472 | ImplByMove) noexcept { |
1473 | sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); |
1474 | new (&contents_) T(std::move(goner)); |
1475 | sequencer_.completeTurn(turn * 2); |
1476 | } |
1477 | |
1478 | /// enqueue by simulating nothrow move with relocation, followed by |
1479 | /// default construction to a noexcept relocation. |
1480 | void enqueueImpl( |
1481 | const uint32_t turn, |
1482 | Atom<uint32_t>& spinCutoff, |
1483 | const bool updateSpinCutoff, |
1484 | T&& goner, |
1485 | ImplByRelocation) noexcept { |
1486 | sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); |
1487 | memcpy( |
1488 | static_cast<void*>(&contents_), |
1489 | static_cast<void const*>(&goner), |
1490 | sizeof(T)); |
1491 | sequencer_.completeTurn(turn * 2); |
1492 | new (&goner) T(); |
1493 | } |
1494 | |
1495 | /// dequeue by destructing followed by relocation. This version is preferred, |
1496 | /// because as much work as possible can be done before waiting. |
1497 | void dequeueImpl( |
1498 | uint32_t turn, |
1499 | Atom<uint32_t>& spinCutoff, |
1500 | const bool updateSpinCutoff, |
1501 | T& elem, |
1502 | ImplByRelocation) noexcept { |
1503 | try { |
1504 | elem.~T(); |
1505 | } catch (...) { |
1506 | // unlikely, but if we don't complete our turn the queue will die |
1507 | } |
1508 | sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); |
1509 | memcpy( |
1510 | static_cast<void*>(&elem), |
1511 | static_cast<void const*>(&contents_), |
1512 | sizeof(T)); |
1513 | sequencer_.completeTurn(turn * 2 + 1); |
1514 | } |
1515 | |
1516 | /// dequeue by nothrow move assignment. |
1517 | void dequeueImpl( |
1518 | uint32_t turn, |
1519 | Atom<uint32_t>& spinCutoff, |
1520 | const bool updateSpinCutoff, |
1521 | T& elem, |
1522 | ImplByMove) noexcept { |
1523 | sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); |
1524 | elem = std::move(*ptr()); |
1525 | destroyContents(); |
1526 | sequencer_.completeTurn(turn * 2 + 1); |
1527 | } |
1528 | }; |
1529 | |
1530 | } // namespace detail |
1531 | |
1532 | } // namespace folly |
1533 | |