1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <atomic> |
20 | #include <chrono> |
21 | #include <memory> |
22 | |
23 | #include <glog/logging.h> |
24 | |
25 | #include <folly/ConstexprMath.h> |
26 | #include <folly/Optional.h> |
27 | #include <folly/Traits.h> |
28 | #include <folly/concurrency/CacheLocality.h> |
29 | #include <folly/lang/Align.h> |
30 | #include <folly/synchronization/Hazptr.h> |
31 | #include <folly/synchronization/SaturatingSemaphore.h> |
32 | #include <folly/synchronization/WaitOptions.h> |
33 | #include <folly/synchronization/detail/Spin.h> |
34 | |
35 | namespace folly { |
36 | |
37 | /// UnboundedQueue supports a variety of options for unbounded |
38 | /// dynamically expanding an shrinking queues, including variations of: |
39 | /// - Single vs. multiple producers |
40 | /// - Single vs. multiple consumers |
41 | /// - Blocking vs. spin-waiting |
42 | /// - Non-waiting, timed, and waiting consumer operations. |
43 | /// Producer operations never wait or fail (unless out-of-memory). |
44 | /// |
45 | /// Template parameters: |
46 | /// - T: element type |
47 | /// - SingleProducer: true if there can be only one producer at a |
48 | /// time. |
49 | /// - SingleConsumer: true if there can be only one consumer at a |
50 | /// time. |
51 | /// - MayBlock: true if consumers may block, false if they only |
52 | /// spin. A performance tuning parameter. |
53 | /// - LgSegmentSize (default 8): Log base 2 of number of elements per |
54 | /// segment. A performance tuning parameter. See below. |
55 | /// - LgAlign (default 7): Log base 2 of alignment directive; can be |
56 | /// used to balance scalability (avoidance of false sharing) with |
57 | /// memory efficiency. |
58 | /// |
59 | /// When to use UnboundedQueue: |
60 | /// - If a small bound may lead to deadlock or performance degradation |
61 | /// under bursty patterns. |
62 | /// - If there is no risk of the queue growing too much. |
63 | /// |
64 | /// When not to use UnboundedQueue: |
65 | /// - If there is risk of the queue growing too much and a large bound |
66 | /// is acceptable, then use DynamicBoundedQueue. |
67 | /// - If the queue must not allocate on enqueue or it must have a |
68 | /// small bound, then use fixed-size MPMCQueue or (if non-blocking |
69 | /// SPSC) ProducerConsumerQueue. |
70 | /// |
71 | /// Template Aliases: |
72 | /// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
73 | /// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
74 | /// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
75 | /// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
76 | /// |
77 | /// Functions: |
78 | /// Producer operations never wait or fail (unless OOM) |
79 | /// void enqueue(const T&); |
80 | /// void enqueue(T&&); |
81 | /// Adds an element to the end of the queue. |
82 | /// |
83 | /// Consumer operations: |
84 | /// void dequeue(T&); |
85 | /// T dequeue(); |
86 | /// Extracts an element from the front of the queue. Waits |
87 | /// until an element is available if needed. |
88 | /// bool try_dequeue(T&); |
89 | /// folly::Optional<T> try_dequeue(); |
90 | /// Tries to extract an element from the front of the queue |
91 | /// if available. |
92 | /// bool try_dequeue_until(T&, time_point& deadline); |
93 | /// folly::Optional<T> try_dequeue_until(time_point& deadline); |
94 | /// Tries to extract an element from the front of the queue |
95 | /// if available until the specified deadline. |
96 | /// bool try_dequeue_for(T&, duration&); |
97 | /// folly::Optional<T> try_dequeue_for(duration&); |
98 | /// Tries to extract an element from the front of the queue if |
99 | /// available until the expiration of the specified duration. |
100 | /// const T* try_peek(); |
101 | /// Returns pointer to the element at the front of the queue |
102 | /// if available, or nullptr if the queue is empty. Only for |
103 | /// SPSC and MPSC. |
104 | /// |
105 | /// Secondary functions: |
106 | /// size_t size(); |
107 | /// Returns an estimate of the size of the queue. |
108 | /// bool empty(); |
109 | /// Returns true only if the queue was empty during the call. |
110 | /// Note: size() and empty() are guaranteed to be accurate only if |
111 | /// the queue is not changed concurrently. |
112 | /// |
113 | /// Usage examples: |
114 | /// @code |
115 | /// /* UMPSC, doesn't block, 1024 int elements per segment */ |
116 | /// UMPSCQueue<int, false, 10> q; |
117 | /// q.enqueue(1); |
118 | /// q.enqueue(2); |
119 | /// q.enqueue(3); |
120 | /// ASSERT_FALSE(q.empty()); |
121 | /// ASSERT_EQ(q.size(), 3); |
122 | /// int v; |
123 | /// q.dequeue(v); |
124 | /// ASSERT_EQ(v, 1); |
125 | /// ASSERT_TRUE(try_dequeue(v)); |
126 | /// ASSERT_EQ(v, 2); |
127 | /// ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1))); |
128 | /// ASSERT_EQ(v, 3); |
129 | /// ASSERT_TRUE(q.empty()); |
130 | /// ASSERT_EQ(q.size(), 0); |
131 | /// ASSERT_FALSE(try_dequeue(v)); |
132 | /// ASSERT_FALSE(try_dequeue_for(v, microseconds(100))); |
133 | /// @endcode |
134 | /// |
135 | /// Design: |
136 | /// - The queue is composed of one or more segments. Each segment has |
137 | /// a fixed size of 2^LgSegmentSize entries. Each segment is used |
138 | /// exactly once. |
139 | /// - Each entry is composed of a futex and a single element. |
140 | /// - The queue contains two 64-bit ticket variables. The producer |
141 | /// ticket counts the number of producer tickets issued so far, and |
142 | /// the same for the consumer ticket. Each ticket number corresponds |
143 | /// to a specific entry in a specific segment. |
144 | /// - The queue maintains two pointers, head and tail. Head points to |
145 | /// the segment that corresponds to the current consumer |
146 | /// ticket. Similarly, tail pointer points to the segment that |
147 | /// corresponds to the producer ticket. |
148 | /// - Segments are organized as a singly linked list. |
149 | /// - The producer with the first ticket in the current producer |
150 | /// segment has primary responsibility for allocating and linking |
151 | /// the next segment. Other producers and connsumers may help do so |
152 | /// when needed if that thread is delayed. |
153 | /// - The producer with the last ticket in the current producer |
154 | /// segment is primarily responsible for advancing the tail pointer |
155 | /// to the next segment. Other producers and consumers may help do |
156 | /// so when needed if that thread is delayed. |
157 | /// - Similarly, the consumer with the last ticket in the current |
158 | /// consumer segment is primarily responsible for advancing the head |
159 | /// pointer to the next segment. Other consumers may help do so when |
160 | /// needed if that thread is delayed. |
161 | /// - The tail pointer must not lag behind the head pointer. |
162 | /// Otherwise, the algorithm cannot be certain about the removal of |
163 | /// segment and would have to incur higher costs to ensure safe |
164 | /// reclamation. Consumers must ensure that head never overtakes |
165 | /// tail. |
166 | /// |
167 | /// Memory Usage: |
168 | /// - An empty queue contains one segment. A nonempty queue contains |
169 | /// one or two more segment than fits its contents. |
170 | /// - Removed segments are not reclaimed until there are no threads, |
171 | /// producers or consumers, with references to them or their |
172 | /// predecessors. That is, a lagging thread may delay the reclamation |
173 | /// of a chain of removed segments. |
174 | /// - The template parameter LgAlign can be used to reduce memory usage |
175 | /// at the cost of increased chance of false sharing. |
176 | /// |
177 | /// Performance considerations: |
178 | /// - All operations take constant time, excluding the costs of |
179 | /// allocation, reclamation, interference from other threads, and |
180 | /// waiting for actions by other threads. |
181 | /// - In general, using the single producer and or single consumer |
182 | /// variants yield better performance than the MP and MC |
183 | /// alternatives. |
184 | /// - SPSC without blocking is the fastest configuration. It doesn't |
185 | /// include any read-modify-write atomic operations, full fences, or |
186 | /// system calls in the critical path. |
187 | /// - MP adds a fetch_add to the critical path of each producer operation. |
188 | /// - MC adds a fetch_add or compare_exchange to the critical path of |
189 | /// each consumer operation. |
190 | /// - The possibility of consumers blocking, even if they never do, |
191 | /// adds a compare_exchange to the critical path of each producer |
192 | /// operation. |
193 | /// - MPMC, SPMC, MPSC require the use of a deferred reclamation |
194 | /// mechanism to guarantee that segments removed from the linked |
195 | /// list, i.e., unreachable from the head pointer, are reclaimed |
196 | /// only after they are no longer needed by any lagging producers or |
197 | /// consumers. |
198 | /// - The overheads of segment allocation and reclamation are intended |
199 | /// to be mostly out of the critical path of the queue's throughput. |
200 | /// - If the template parameter LgSegmentSize is changed, it should be |
201 | /// set adequately high to keep the amortized cost of allocation and |
202 | /// reclamation low. |
203 | /// - It is recommended to measure performance with different variants |
204 | /// when applicable, e.g., UMPMC vs UMPSC. Depending on the use |
205 | /// case, sometimes the variant with the higher sequential overhead |
206 | /// may yield better results due to, for example, more favorable |
207 | /// producer-consumer balance or favorable timing for avoiding |
208 | /// costly blocking. |
209 | |
210 | template < |
211 | typename T, |
212 | bool SingleProducer, |
213 | bool SingleConsumer, |
214 | bool MayBlock, |
215 | size_t LgSegmentSize = 8, |
216 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
217 | template <typename> class Atom = std::atomic> |
218 | class UnboundedQueue { |
219 | using Ticket = uint64_t; |
220 | class Entry; |
221 | class Segment; |
222 | |
223 | static constexpr bool SPSC = SingleProducer && SingleConsumer; |
224 | static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27; |
225 | static constexpr size_t SegmentSize = 1u << LgSegmentSize; |
226 | static constexpr size_t Align = 1u << LgAlign; |
227 | |
228 | static_assert( |
229 | std::is_nothrow_destructible<T>::value, |
230 | "T must be nothrow_destructible" ); |
231 | static_assert((Stride & 1) == 1, "Stride must be odd" ); |
232 | static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32" ); |
233 | static_assert(LgAlign < 16, "LgAlign must be < 16" ); |
234 | |
235 | using Sem = folly::SaturatingSemaphore<MayBlock, Atom>; |
236 | |
237 | struct Consumer { |
238 | Atom<Segment*> head; |
239 | Atom<Ticket> ticket; |
240 | hazptr_obj_batch<Atom> batch; |
241 | explicit Consumer(Segment* s) : head(s), ticket(0) { |
242 | s->set_batch_no_tag(&batch); // defined in hazptr_obj |
243 | } |
244 | }; |
245 | struct Producer { |
246 | Atom<Segment*> tail; |
247 | Atom<Ticket> ticket; |
248 | explicit Producer(Segment* s) : tail(s), ticket(0) {} |
249 | }; |
250 | |
251 | alignas(Align) Consumer c_; |
252 | alignas(Align) Producer p_; |
253 | |
254 | public: |
255 | /** constructor */ |
256 | UnboundedQueue() |
257 | : c_(new Segment(0)), p_(c_.head.load(std::memory_order_relaxed)) {} |
258 | |
259 | /** destructor */ |
260 | ~UnboundedQueue() { |
261 | cleanUpRemainingItems(); |
262 | c_.batch.shutdown_and_reclaim(); |
263 | reclaimRemainingSegments(); |
264 | } |
265 | |
266 | /** enqueue */ |
267 | FOLLY_ALWAYS_INLINE void enqueue(const T& arg) { |
268 | enqueueImpl(arg); |
269 | } |
270 | |
271 | FOLLY_ALWAYS_INLINE void enqueue(T&& arg) { |
272 | enqueueImpl(std::move(arg)); |
273 | } |
274 | |
275 | /** dequeue */ |
276 | FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept { |
277 | dequeueImpl(item); |
278 | } |
279 | |
280 | FOLLY_ALWAYS_INLINE T dequeue() noexcept { |
281 | T item; |
282 | dequeueImpl(item); |
283 | return item; |
284 | } |
285 | |
286 | /** try_dequeue */ |
287 | FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept { |
288 | auto o = try_dequeue(); |
289 | if (LIKELY(o.has_value())) { |
290 | item = std::move(*o); |
291 | return true; |
292 | } |
293 | return false; |
294 | } |
295 | |
296 | FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue() noexcept { |
297 | return tryDequeueUntil(std::chrono::steady_clock::time_point::min()); |
298 | } |
299 | |
300 | /** try_dequeue_until */ |
301 | template <typename Clock, typename Duration> |
302 | FOLLY_ALWAYS_INLINE bool try_dequeue_until( |
303 | T& item, |
304 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
305 | folly::Optional<T> o = try_dequeue_until(deadline); |
306 | |
307 | if (LIKELY(o.has_value())) { |
308 | item = std::move(*o); |
309 | return true; |
310 | } |
311 | |
312 | return false; |
313 | } |
314 | |
315 | template <typename Clock, typename Duration> |
316 | FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_until( |
317 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
318 | return tryDequeueUntil(deadline); |
319 | } |
320 | |
321 | /** try_dequeue_for */ |
322 | template <typename Rep, typename Period> |
323 | FOLLY_ALWAYS_INLINE bool try_dequeue_for( |
324 | T& item, |
325 | const std::chrono::duration<Rep, Period>& duration) noexcept { |
326 | folly::Optional<T> o = try_dequeue_for(duration); |
327 | |
328 | if (LIKELY(o.has_value())) { |
329 | item = std::move(*o); |
330 | return true; |
331 | } |
332 | |
333 | return false; |
334 | } |
335 | |
336 | template <typename Rep, typename Period> |
337 | FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_for( |
338 | const std::chrono::duration<Rep, Period>& duration) noexcept { |
339 | folly::Optional<T> o = try_dequeue(); |
340 | if (LIKELY(o.has_value())) { |
341 | return o; |
342 | } |
343 | return tryDequeueUntil(std::chrono::steady_clock::now() + duration); |
344 | } |
345 | |
346 | /** try_peek */ |
347 | FOLLY_ALWAYS_INLINE const T* try_peek() noexcept { |
348 | /* This function is supported only for USPSC and UMPSC queues. */ |
349 | DCHECK(SingleConsumer); |
350 | return tryPeekUntil(std::chrono::steady_clock::time_point::min()); |
351 | } |
352 | |
353 | /** size */ |
354 | size_t size() const noexcept { |
355 | auto p = producerTicket(); |
356 | auto c = consumerTicket(); |
357 | return p > c ? p - c : 0; |
358 | } |
359 | |
360 | /** empty */ |
361 | bool empty() const noexcept { |
362 | auto c = consumerTicket(); |
363 | auto p = producerTicket(); |
364 | return p <= c; |
365 | } |
366 | |
367 | private: |
368 | /** enqueueImpl */ |
369 | template <typename Arg> |
370 | FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) { |
371 | if (SPSC) { |
372 | Segment* s = tail(); |
373 | enqueueCommon(s, std::forward<Arg>(arg)); |
374 | } else { |
375 | // Using hazptr_holder instead of hazptr_local because it is |
376 | // possible that the T ctor happens to use hazard pointers. |
377 | hazptr_holder<Atom> hptr; |
378 | Segment* s = hptr.get_protected(p_.tail); |
379 | enqueueCommon(s, std::forward<Arg>(arg)); |
380 | } |
381 | } |
382 | |
383 | /** enqueueCommon */ |
384 | template <typename Arg> |
385 | FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) { |
386 | Ticket t = fetchIncrementProducerTicket(); |
387 | if (!SingleProducer) { |
388 | s = findSegment(s, t); |
389 | } |
390 | DCHECK_GE(t, s->minTicket()); |
391 | DCHECK_LT(t, s->minTicket() + SegmentSize); |
392 | size_t idx = index(t); |
393 | Entry& e = s->entry(idx); |
394 | e.putItem(std::forward<Arg>(arg)); |
395 | if (responsibleForAlloc(t)) { |
396 | allocNextSegment(s); |
397 | } |
398 | if (responsibleForAdvance(t)) { |
399 | advanceTail(s); |
400 | } |
401 | } |
402 | |
403 | /** dequeueImpl */ |
404 | FOLLY_ALWAYS_INLINE void dequeueImpl(T& item) noexcept { |
405 | if (SPSC) { |
406 | Segment* s = head(); |
407 | dequeueCommon(s, item); |
408 | } else { |
409 | // Using hazptr_holder instead of hazptr_local because it is |
410 | // possible to call the T dtor and it may happen to use hazard |
411 | // pointers. |
412 | hazptr_holder<Atom> hptr; |
413 | Segment* s = hptr.get_protected(c_.head); |
414 | dequeueCommon(s, item); |
415 | } |
416 | } |
417 | |
418 | /** dequeueCommon */ |
419 | FOLLY_ALWAYS_INLINE void dequeueCommon(Segment* s, T& item) noexcept { |
420 | Ticket t = fetchIncrementConsumerTicket(); |
421 | if (!SingleConsumer) { |
422 | s = findSegment(s, t); |
423 | } |
424 | size_t idx = index(t); |
425 | Entry& e = s->entry(idx); |
426 | e.takeItem(item); |
427 | if (responsibleForAdvance(t)) { |
428 | advanceHead(s); |
429 | } |
430 | } |
431 | |
432 | /** tryDequeueUntil */ |
433 | template <typename Clock, typename Duration> |
434 | FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntil( |
435 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
436 | if (SingleConsumer) { |
437 | Segment* s = head(); |
438 | return tryDequeueUntilSC(s, deadline); |
439 | } else { |
440 | // Using hazptr_holder instead of hazptr_local because it is |
441 | // possible to call ~T() and it may happen to use hazard pointers. |
442 | hazptr_holder<Atom> hptr; |
443 | Segment* s = hptr.get_protected(c_.head); |
444 | return tryDequeueUntilMC(s, deadline); |
445 | } |
446 | } |
447 | |
448 | /** tryDequeueUntilSC */ |
449 | template <typename Clock, typename Duration> |
450 | FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilSC( |
451 | Segment* s, |
452 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
453 | Ticket t = consumerTicket(); |
454 | DCHECK_GE(t, s->minTicket()); |
455 | DCHECK_LT(t, (s->minTicket() + SegmentSize)); |
456 | size_t idx = index(t); |
457 | Entry& e = s->entry(idx); |
458 | if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { |
459 | return folly::Optional<T>(); |
460 | } |
461 | setConsumerTicket(t + 1); |
462 | auto ret = e.takeItem(); |
463 | if (responsibleForAdvance(t)) { |
464 | advanceHead(s); |
465 | } |
466 | return ret; |
467 | } |
468 | |
469 | /** tryDequeueUntilMC */ |
470 | template <typename Clock, typename Duration> |
471 | FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilMC( |
472 | Segment* s, |
473 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
474 | while (true) { |
475 | Ticket t = consumerTicket(); |
476 | if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { |
477 | s = getAllocNextSegment(s, t); |
478 | DCHECK(s); |
479 | continue; |
480 | } |
481 | size_t idx = index(t); |
482 | Entry& e = s->entry(idx); |
483 | if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { |
484 | return folly::Optional<T>(); |
485 | } |
486 | if (!c_.ticket.compare_exchange_weak( |
487 | t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) { |
488 | continue; |
489 | } |
490 | auto ret = e.takeItem(); |
491 | if (responsibleForAdvance(t)) { |
492 | advanceHead(s); |
493 | } |
494 | return ret; |
495 | } |
496 | } |
497 | |
498 | /** tryDequeueWaitElem */ |
499 | template <typename Clock, typename Duration> |
500 | FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem( |
501 | Entry& e, |
502 | Ticket t, |
503 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
504 | if (LIKELY(e.tryWaitUntil(deadline))) { |
505 | return true; |
506 | } |
507 | return t < producerTicket(); |
508 | } |
509 | |
510 | /** tryPeekUntil */ |
511 | template <typename Clock, typename Duration> |
512 | FOLLY_ALWAYS_INLINE const T* tryPeekUntil( |
513 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
514 | Segment* s = head(); |
515 | Ticket t = consumerTicket(); |
516 | DCHECK_GE(t, s->minTicket()); |
517 | DCHECK_LT(t, (s->minTicket() + SegmentSize)); |
518 | size_t idx = index(t); |
519 | Entry& e = s->entry(idx); |
520 | if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { |
521 | return nullptr; |
522 | } |
523 | return e.peekItem(); |
524 | } |
525 | |
526 | /** findSegment */ |
527 | FOLLY_ALWAYS_INLINE |
528 | Segment* findSegment(Segment* s, const Ticket t) noexcept { |
529 | while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { |
530 | s = getAllocNextSegment(s, t); |
531 | DCHECK(s); |
532 | } |
533 | return s; |
534 | } |
535 | |
536 | /** getAllocNextSegment */ |
537 | Segment* getAllocNextSegment(Segment* s, Ticket t) noexcept { |
538 | Segment* next = s->nextSegment(); |
539 | if (!next) { |
540 | DCHECK_GE(t, s->minTicket() + SegmentSize); |
541 | auto diff = t - (s->minTicket() + SegmentSize); |
542 | if (diff > 0) { |
543 | auto dur = std::chrono::microseconds(diff); |
544 | auto deadline = std::chrono::steady_clock::now() + dur; |
545 | WaitOptions opt; |
546 | opt.spin_max(dur); |
547 | detail::spin_pause_until( |
548 | deadline, opt, [s] { return s->nextSegment(); }); |
549 | next = s->nextSegment(); |
550 | if (next) { |
551 | return next; |
552 | } |
553 | } |
554 | next = allocNextSegment(s); |
555 | } |
556 | DCHECK(next); |
557 | return next; |
558 | } |
559 | |
560 | /** allocNextSegment */ |
561 | Segment* allocNextSegment(Segment* s) { |
562 | auto t = s->minTicket() + SegmentSize; |
563 | Segment* next = new Segment(t); |
564 | next->set_batch_no_tag(&c_.batch); // defined in hazptr_obj |
565 | next->acquire_ref_safe(); // defined in hazptr_obj_base_linked |
566 | if (!s->casNextSegment(next)) { |
567 | delete next; |
568 | next = s->nextSegment(); |
569 | } |
570 | DCHECK(next); |
571 | return next; |
572 | } |
573 | |
574 | /** advanceTail */ |
575 | void advanceTail(Segment* s) noexcept { |
576 | if (SPSC) { |
577 | Segment* next = s->nextSegment(); |
578 | DCHECK(next); |
579 | setTail(next); |
580 | } else { |
581 | Ticket t = s->minTicket() + SegmentSize; |
582 | advanceTailToTicket(t); |
583 | } |
584 | } |
585 | |
586 | /** advanceTailToTicket */ |
587 | void advanceTailToTicket(Ticket t) noexcept { |
588 | Segment* s = tail(); |
589 | while (s->minTicket() < t) { |
590 | Segment* next = s->nextSegment(); |
591 | if (!next) { |
592 | next = allocNextSegment(s); |
593 | } |
594 | DCHECK(next); |
595 | casTail(s, next); |
596 | s = tail(); |
597 | } |
598 | } |
599 | |
600 | /** advanceHead */ |
601 | void advanceHead(Segment* s) noexcept { |
602 | if (SPSC) { |
603 | while (tail() == s) { |
604 | /* Wait for producer to advance tail. */ |
605 | asm_volatile_pause(); |
606 | } |
607 | Segment* next = s->nextSegment(); |
608 | DCHECK(next); |
609 | setHead(next); |
610 | reclaimSegment(s); |
611 | } else { |
612 | Ticket t = s->minTicket() + SegmentSize; |
613 | advanceHeadToTicket(t); |
614 | } |
615 | } |
616 | |
617 | /** advanceHeadToTicket */ |
618 | void advanceHeadToTicket(Ticket t) noexcept { |
619 | /* Tail must not lag behind head. Otherwise, the algorithm cannot |
620 | be certain about removal of segments. */ |
621 | advanceTailToTicket(t); |
622 | Segment* s = head(); |
623 | if (SingleConsumer) { |
624 | DCHECK_EQ(s->minTicket() + SegmentSize, t); |
625 | Segment* next = s->nextSegment(); |
626 | DCHECK(next); |
627 | setHead(next); |
628 | reclaimSegment(s); |
629 | } else { |
630 | while (s->minTicket() < t) { |
631 | Segment* next = s->nextSegment(); |
632 | DCHECK(next); |
633 | if (casHead(s, next)) { |
634 | reclaimSegment(s); |
635 | s = next; |
636 | } |
637 | } |
638 | } |
639 | } |
640 | |
641 | /** reclaimSegment */ |
642 | void reclaimSegment(Segment* s) noexcept { |
643 | if (SPSC) { |
644 | delete s; |
645 | } else { |
646 | s->retire(); // defined in hazptr_obj_base_linked |
647 | } |
648 | } |
649 | |
650 | /** cleanUpRemainingItems */ |
651 | void cleanUpRemainingItems() { |
652 | auto end = producerTicket(); |
653 | auto s = head(); |
654 | for (auto t = consumerTicket(); t < end; ++t) { |
655 | if (t >= s->minTicket() + SegmentSize) { |
656 | s = s->nextSegment(); |
657 | } |
658 | DCHECK_LT(t, (s->minTicket() + SegmentSize)); |
659 | auto idx = index(t); |
660 | auto& e = s->entry(idx); |
661 | e.destroyItem(); |
662 | } |
663 | } |
664 | |
665 | /** reclaimRemainingSegments */ |
666 | void reclaimRemainingSegments() { |
667 | auto h = head(); |
668 | auto s = h->nextSegment(); |
669 | h->setNextSegment(nullptr); |
670 | reclaimSegment(h); |
671 | while (s) { |
672 | auto next = s->nextSegment(); |
673 | delete s; |
674 | s = next; |
675 | } |
676 | } |
677 | |
678 | FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept { |
679 | return (t * Stride) & (SegmentSize - 1); |
680 | } |
681 | |
682 | FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept { |
683 | return (t & (SegmentSize - 1)) == 0; |
684 | } |
685 | |
686 | FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept { |
687 | return (t & (SegmentSize - 1)) == (SegmentSize - 1); |
688 | } |
689 | |
690 | FOLLY_ALWAYS_INLINE Segment* head() const noexcept { |
691 | return c_.head.load(std::memory_order_acquire); |
692 | } |
693 | |
694 | FOLLY_ALWAYS_INLINE Segment* tail() const noexcept { |
695 | return p_.tail.load(std::memory_order_acquire); |
696 | } |
697 | |
698 | FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept { |
699 | return p_.ticket.load(std::memory_order_acquire); |
700 | } |
701 | |
702 | FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept { |
703 | return c_.ticket.load(std::memory_order_acquire); |
704 | } |
705 | |
706 | void setHead(Segment* s) noexcept { |
707 | DCHECK(SingleConsumer); |
708 | c_.head.store(s, std::memory_order_relaxed); |
709 | } |
710 | |
711 | void setTail(Segment* s) noexcept { |
712 | DCHECK(SPSC); |
713 | p_.tail.store(s, std::memory_order_release); |
714 | } |
715 | |
716 | bool (Segment*& s, Segment* next) noexcept { |
717 | DCHECK(!SingleConsumer); |
718 | return c_.head.compare_exchange_strong( |
719 | s, next, std::memory_order_release, std::memory_order_acquire); |
720 | } |
721 | |
722 | void casTail(Segment*& s, Segment* next) noexcept { |
723 | DCHECK(!SPSC); |
724 | p_.tail.compare_exchange_strong( |
725 | s, next, std::memory_order_release, std::memory_order_relaxed); |
726 | } |
727 | |
728 | FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept { |
729 | p_.ticket.store(t, std::memory_order_release); |
730 | } |
731 | |
732 | FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept { |
733 | c_.ticket.store(t, std::memory_order_release); |
734 | } |
735 | |
736 | FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept { |
737 | if (SingleConsumer) { |
738 | Ticket oldval = consumerTicket(); |
739 | setConsumerTicket(oldval + 1); |
740 | return oldval; |
741 | } else { // MC |
742 | return c_.ticket.fetch_add(1, std::memory_order_acq_rel); |
743 | } |
744 | } |
745 | |
746 | FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept { |
747 | if (SingleProducer) { |
748 | Ticket oldval = producerTicket(); |
749 | setProducerTicket(oldval + 1); |
750 | return oldval; |
751 | } else { // MP |
752 | return p_.ticket.fetch_add(1, std::memory_order_acq_rel); |
753 | } |
754 | } |
755 | |
756 | /** |
757 | * Entry |
758 | */ |
759 | class Entry { |
760 | Sem flag_; |
761 | aligned_storage_for_t<T> item_; |
762 | |
763 | public: |
764 | template <typename Arg> |
765 | FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) { |
766 | new (&item_) T(std::forward<Arg>(arg)); |
767 | flag_.post(); |
768 | } |
769 | |
770 | FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept { |
771 | flag_.wait(); |
772 | getItem(item); |
773 | } |
774 | |
775 | FOLLY_ALWAYS_INLINE folly::Optional<T> takeItem() noexcept { |
776 | flag_.wait(); |
777 | return getItem(); |
778 | } |
779 | |
780 | FOLLY_ALWAYS_INLINE const T* peekItem() noexcept { |
781 | flag_.wait(); |
782 | return itemPtr(); |
783 | } |
784 | |
785 | template <typename Clock, typename Duration> |
786 | FOLLY_EXPORT FOLLY_ALWAYS_INLINE bool tryWaitUntil( |
787 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
788 | // wait-options from benchmarks on contended queues: |
789 | static constexpr auto const opt = |
790 | Sem::wait_options().spin_max(std::chrono::microseconds(10)); |
791 | return flag_.try_wait_until(deadline, opt); |
792 | } |
793 | |
794 | FOLLY_ALWAYS_INLINE void destroyItem() noexcept { |
795 | itemPtr()->~T(); |
796 | } |
797 | |
798 | private: |
799 | FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept { |
800 | item = std::move(*(itemPtr())); |
801 | destroyItem(); |
802 | } |
803 | |
804 | FOLLY_ALWAYS_INLINE folly::Optional<T> getItem() noexcept { |
805 | folly::Optional<T> ret = std::move(*(itemPtr())); |
806 | destroyItem(); |
807 | return ret; |
808 | } |
809 | |
810 | FOLLY_ALWAYS_INLINE T* itemPtr() noexcept { |
811 | return static_cast<T*>(static_cast<void*>(&item_)); |
812 | } |
813 | }; // Entry |
814 | |
815 | /** |
816 | * Segment |
817 | */ |
818 | class Segment : public hazptr_obj_base_linked<Segment, Atom> { |
819 | Atom<Segment*> next_{nullptr}; |
820 | const Ticket min_; |
821 | alignas(Align) Entry b_[SegmentSize]; |
822 | |
823 | public: |
824 | explicit Segment(const Ticket t) noexcept : min_(t) {} |
825 | |
826 | Segment* nextSegment() const noexcept { |
827 | return next_.load(std::memory_order_acquire); |
828 | } |
829 | |
830 | void setNextSegment(Segment* next) { |
831 | next_.store(next, std::memory_order_relaxed); |
832 | } |
833 | |
834 | bool casNextSegment(Segment* next) noexcept { |
835 | Segment* expected = nullptr; |
836 | return next_.compare_exchange_strong( |
837 | expected, next, std::memory_order_release, std::memory_order_relaxed); |
838 | } |
839 | |
840 | FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept { |
841 | DCHECK_EQ((min_ & (SegmentSize - 1)), Ticket(0)); |
842 | return min_; |
843 | } |
844 | |
845 | FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept { |
846 | return b_[index]; |
847 | } |
848 | |
849 | template <typename S> |
850 | void push_links(bool m, S& s) { |
851 | if (m == false) { // next_ is immutable |
852 | auto p = nextSegment(); |
853 | if (p) { |
854 | s.push(p); |
855 | } |
856 | } |
857 | } |
858 | }; // Segment |
859 | |
860 | }; // UnboundedQueue |
861 | |
862 | /* Aliases */ |
863 | |
864 | template < |
865 | typename T, |
866 | bool MayBlock, |
867 | size_t LgSegmentSize = 8, |
868 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
869 | template <typename> class Atom = std::atomic> |
870 | using USPSCQueue = |
871 | UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>; |
872 | |
873 | template < |
874 | typename T, |
875 | bool MayBlock, |
876 | size_t LgSegmentSize = 8, |
877 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
878 | template <typename> class Atom = std::atomic> |
879 | using UMPSCQueue = |
880 | UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>; |
881 | |
882 | template < |
883 | typename T, |
884 | bool MayBlock, |
885 | size_t LgSegmentSize = 8, |
886 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
887 | template <typename> class Atom = std::atomic> |
888 | using USPMCQueue = |
889 | UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>; |
890 | |
891 | template < |
892 | typename T, |
893 | bool MayBlock, |
894 | size_t LgSegmentSize = 8, |
895 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
896 | template <typename> class Atom = std::atomic> |
897 | using UMPMCQueue = |
898 | UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>; |
899 | |
900 | } // namespace folly |
901 | |