1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/concurrency/CacheLocality.h> |
20 | #include <folly/concurrency/UnboundedQueue.h> |
21 | |
22 | #include <glog/logging.h> |
23 | |
24 | #include <atomic> |
25 | #include <chrono> |
26 | |
27 | namespace folly { |
28 | |
29 | /// DynamicBoundedQueue supports: |
30 | /// - Dynamic memory usage that grows and shrink in proportion to the |
31 | /// number of elements in the queue. |
32 | /// - Adjustable capacity that helps throttle pathological cases of |
33 | /// producer-consumer imbalance that may lead to excessive memory |
34 | /// usage. |
35 | /// - The adjustable capacity can also help prevent deadlock by |
36 | /// allowing users to temporarily increase capacity substantially to |
37 | /// guarantee accommodating producer requests that cannot wait. |
38 | /// - SPSC, SPMC, MPSC, MPMC variants. |
39 | /// - Blocking and spinning-only variants. |
40 | /// - Inter-operable non-waiting, timed until, timed for, and waiting |
41 | /// variants of producer and consumer operations. |
42 | /// - Optional variable element weights. |
43 | /// |
44 | /// Element Weights |
45 | /// - Queue elements may have variable weights (calculated using a |
46 | /// template parameter) that are by default 1. |
47 | /// - Element weights count towards the queue's capacity. |
48 | /// - Elements weights are not priorities and do not affect element |
49 | /// order. Queues with variable element weights follow FIFO order, |
50 | /// the same as default queues. |
51 | /// |
52 | /// When to use DynamicBoundedQueue: |
53 | /// - If a small maximum capacity may lead to deadlock or performance |
54 | /// degradation under bursty patterns and a larger capacity is |
55 | /// sufficient. |
56 | /// - If the typical queue size is expected to be much lower than the |
57 | /// maximum capacity |
58 | /// - If an unbounded queue is susceptible to growing too much. |
59 | /// - If support for variable element weights is needed. |
60 | /// |
61 | /// When not to use DynamicBoundedQueue? |
62 | /// - If dynamic memory allocation is unacceptable or if the maximum |
63 | /// capacity needs to be small, then use fixed-size MPMCQueue or (if |
64 | /// non-blocking SPSC) ProducerConsumerQueue. |
65 | /// - If there is no risk of the queue growing too much, then use |
66 | /// UnboundedQueue. |
67 | /// |
68 | /// Setting capacity |
69 | /// - The general rule is to set the capacity as high as acceptable. |
70 | /// The queue performs best when it is not near full capacity. |
71 | /// - The implementation may allow extra slack in capacity (~10%) for |
72 | /// amortizing some costly steps. Therefore, precise capacity is not |
73 | /// guaranteed and cannot be relied on for synchronization; i.e., |
74 | /// this queue cannot be used as a semaphore. |
75 | /// |
76 | /// Performance expectations: |
77 | /// - As long as the queue size is below capacity in the common case, |
78 | /// performance is comparable to MPMCQueue and better in cases of |
79 | /// higher producer demand. |
80 | /// - Performance degrades gracefully at full capacity. |
81 | /// - It is recommended to measure performance with different variants |
82 | /// when applicable, e.g., DMPMC vs DMPSC. Depending on the use |
83 | /// case, sometimes the variant with the higher sequential overhead |
84 | /// may yield better results due to, for example, more favorable |
85 | /// producer-consumer balance or favorable timing for avoiding |
86 | /// costly blocking. |
87 | /// - See DynamicBoundedQueueTest.cpp for some benchmark results. |
88 | /// |
89 | /// Template parameters: |
90 | /// - T: element type |
91 | /// - SingleProducer: true if there can be only one producer at a |
92 | /// time. |
93 | /// - SingleConsumer: true if there can be only one consumer at a |
94 | /// time. |
95 | /// - MayBlock: true if producers or consumers may block. |
96 | /// - LgSegmentSize (default 8): Log base 2 of number of elements per |
97 | /// UnboundedQueue segment. |
98 | /// - LgAlign (default 7): Log base 2 of alignment directive; can be |
99 | /// used to balance scalability (avoidance of false sharing) with |
100 | /// memory efficiency. |
101 | /// - WeightFn (DefaultWeightFn<T>): A customizable weight computing type |
102 | /// for computing the weights of elements. The default weight is 1. |
103 | /// |
104 | /// Template Aliases: |
105 | /// DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
106 | /// DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
107 | /// DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
108 | /// DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
109 | /// |
110 | /// Functions: |
111 | /// Constructor |
112 | /// Takes a capacity value as an argument. |
113 | /// |
114 | /// Producer functions: |
115 | /// void enqueue(const T&); |
116 | /// void enqueue(T&&); |
117 | /// Adds an element to the end of the queue. Waits until |
118 | /// capacity is available if necessary. |
119 | /// bool try_enqueue(const T&); |
120 | /// bool try_enqueue(T&&); |
121 | /// Tries to add an element to the end of the queue if |
122 | /// capacity allows it. Returns true if successful. Otherwise |
123 | /// Returns false. |
124 | /// bool try_enqueue_until(const T&, time_point& deadline); |
125 | /// bool try_enqueue_until(T&&, time_point& deadline); |
126 | /// Tries to add an element to the end of the queue if |
127 | /// capacity allows it until the specified deadline. Returns |
128 | /// true if successful, otherwise false. |
129 | /// bool try_enqueue_for(const T&, duration&); |
130 | /// bool try_enqueue_for(T&&, duration&); |
131 | /// Tries to add an element to the end of the queue if |
132 | /// capacity allows until the expiration of the specified |
133 | /// duration. Returns true if successful, otherwise false. |
134 | /// |
135 | /// Consumer functions: |
136 | /// void dequeue(T&); |
137 | /// Extracts an element from the front of the queue. Waits |
138 | /// until an element is available if necessary. |
139 | /// bool try_dequeue(T&); |
140 | /// Tries to extracts an element from the front of the queue |
141 | /// if available. Returns true if successful, otherwise false. |
142 | /// bool try_dequeue_until(T&, time_point& deadline); |
143 | /// Tries to extracts an element from the front of the queue |
144 | /// if available until the specified daedline. Returns true |
145 | /// if successful. Otherwise Returns false. |
146 | /// bool try_dequeue_for(T&, duration&); |
147 | /// Tries to extracts an element from the front of the queue |
148 | /// if available until the expiration of the specified |
149 | /// duration. Returns true if successful. Otherwise Returns |
150 | /// false. |
151 | /// |
152 | /// Secondary functions: |
153 | /// void reset_capacity(size_t capacity); |
154 | /// Changes the capacity of the queue. Does not affect the |
155 | /// current contents of the queue. Guaranteed only to affect |
156 | /// subsequent enqueue operations. May or may not affect |
157 | /// concurrent operations. Capacity must be at least 1000. |
158 | /// Weight weight(); |
159 | /// Returns an estimate of the total weight of the elements in |
160 | /// the queue. |
161 | /// size_t size(); |
162 | /// Returns an estimate of the total number of elements. |
163 | /// bool empty(); |
164 | /// Returns true only if the queue was empty during the call. |
165 | /// Note: weight(), size(), and empty() are guaranteed to be |
166 | /// accurate only if there are no concurrent changes to the queue. |
167 | /// |
168 | /// Usage example with default weight: |
169 | /// @code |
170 | /// /* DMPSC, doesn't block, 1024 int elements per segment */ |
171 | /// DMPSCQueue<int, false, 10> q(100000); |
172 | /// ASSERT_TRUE(q.empty()); |
173 | /// ASSERT_EQ(q.size(), 0); |
174 | /// q.enqueue(1)); |
175 | /// ASSERT_TRUE(q.try_enqueue(2)); |
176 | /// ASSERT_TRUE(q.try_enqueue_until(3, deadline)); |
177 | /// ASSERT_TRUE(q.try_enqueue(4, duration)); |
178 | /// // ... enqueue more elements until capacity is full |
179 | /// // See above comments about imprecise capacity guarantees |
180 | /// ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait |
181 | /// size_t sz = q.size(); |
182 | /// ASSERT_GE(sz, 100000); |
183 | /// q.reset_capacity(1000000000); // set huge capacity |
184 | /// ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds |
185 | /// q.reset_capacity(100000); // set capacity back to 100,000 |
186 | /// ASSERT_FALSE(q.try_enqueue(100002)); |
187 | /// ASSERT_EQ(q.size(), sz + 1); |
188 | /// int v; |
189 | /// q.dequeue(v); |
190 | /// ASSERT_EQ(v, 1); |
191 | /// ASSERT_TRUE(q.try_dequeue(v)); |
192 | /// ASSERT_EQ(v, 2); |
193 | /// ASSERT_TRUE(q.try_dequeue_until(v, deadline)); |
194 | /// ASSERT_EQ(v, 3); |
195 | /// ASSERT_TRUE(q.try_dequeue_for(v, duration)); |
196 | /// ASSERT_EQ(v, 4); |
197 | /// ASSERT_EQ(q.size(), sz - 3); |
198 | /// @endcode |
199 | /// |
200 | /// Usage example with custom weights: |
201 | /// @code |
202 | /// struct CustomWeightFn { |
203 | /// uint64_t operator()(int val) { return val / 100; } |
204 | /// }; |
205 | /// DMPMCQueue<int, false, 10, CustomWeightFn> q(20); |
206 | /// ASSERT_TRUE(q.empty()); |
207 | /// q.enqueue(100); |
208 | /// ASSERT_TRUE(q.try_enqueue(200)); |
209 | /// ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1))); |
210 | /// ASSERT_EQ(q.size(), 3); |
211 | /// ASSERT_EQ(q.weight(), 8); |
212 | /// ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1))); |
213 | /// q.reset_capacity(1000000); // set capacity to 1000000 instead of 20 |
214 | /// ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1))); |
215 | /// q.reset_capacity(20); // set capacity to 20 again |
216 | /// ASSERT_FALSE(q.try_enqueue(100)); |
217 | /// ASSERT_EQ(q.size(), 4); |
218 | /// ASSERT_EQ(q.weight(), 25); |
219 | /// int v; |
220 | /// q.dequeue(v); |
221 | /// ASSERT_EQ(v, 100); |
222 | /// ASSERT_TRUE(q.try_dequeue(v)); |
223 | /// ASSERT_EQ(v, 200); |
224 | /// ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1))); |
225 | /// ASSERT_EQ(v, 500); |
226 | /// ASSERT_EQ(q.size(), 1); |
227 | /// ASSERT_EQ(q.weight(), 17); |
228 | /// @endcode |
229 | /// |
230 | /// Design: |
231 | /// - The implementation is on top of UnboundedQueue. |
232 | /// - The main FIFO functionality is in UnboundedQueue. |
233 | /// DynamicBoundedQueue manages keeping the total queue weight |
234 | /// within the specified capacity. |
235 | /// - For the sake of scalability, the data structures are designed to |
236 | /// minimize interference between producers on one side and |
237 | /// consumers on the other. |
238 | /// - Producers add to a debit variable the weight of the added |
239 | /// element and check capacity. |
240 | /// - Consumers add to a credit variable the weight of the removed |
241 | /// element. |
242 | /// - Producers, for the sake of scalability, use fetch_add to add to |
243 | /// the debit variable and subtract if it exceeded capacity, |
244 | /// rather than using compare_exchange to avoid overshooting. |
245 | /// - Consumers, infrequently, transfer credit to a transfer variable |
246 | /// and unblock any blocked producers. The transfer variable can be |
247 | /// used by producers to decrease their debit when needed. |
248 | /// - Note that a low capacity will trigger frequent credit transfer |
249 | /// by consumers that may degrade performance. Capacity should not |
250 | /// be set too low. |
251 | /// - Transfer of credit by consumers is triggered when the amount of |
252 | /// credit reaches a threshold (1/10 of capacity). |
253 | /// - The waiting of consumers is handled in UnboundedQueue. |
254 | /// The waiting of producers is handled in this template. |
255 | /// - For a producer operation, if the difference between debit and |
256 | /// capacity (plus some slack to account for the transfer threshold) |
257 | /// does not accommodate the weight of the new element, it first |
258 | /// tries to transfer credit that may have already been made |
259 | /// available by consumers. If this is insufficient and MayBlock is |
260 | /// true, then the producer uses a futex to block until new credit |
261 | /// is transferred by a consumer. |
262 | /// |
263 | /// Memory Usage: |
264 | /// - Aside from three cache lines for managing capacity, the memory |
265 | /// for queue elements is managed using UnboundedQueue and grows and |
266 | /// shrinks dynamically with the number of elements. |
267 | /// - The template parameter LgAlign can be used to reduce memory usage |
268 | /// at the cost of increased chance of false sharing. |
269 | |
270 | template <typename T> |
271 | struct DefaultWeightFn { |
272 | template <typename Arg> |
273 | uint64_t operator()(Arg&&) const noexcept { |
274 | return 1; |
275 | } |
276 | }; |
277 | |
278 | template < |
279 | typename T, |
280 | bool SingleProducer, |
281 | bool SingleConsumer, |
282 | bool MayBlock, |
283 | size_t LgSegmentSize = 8, |
284 | size_t LgAlign = 7, |
285 | typename WeightFn = DefaultWeightFn<T>, |
286 | template <typename> class Atom = std::atomic> |
287 | class DynamicBoundedQueue { |
288 | using Weight = uint64_t; |
289 | |
290 | enum WaitingState : uint32_t { |
291 | NOTWAITING = 0, |
292 | WAITING = 1, |
293 | }; |
294 | |
295 | static constexpr bool SPSC = SingleProducer && SingleConsumer; |
296 | static constexpr size_t Align = 1u << LgAlign; |
297 | |
298 | static_assert(LgAlign < 16, "LgAlign must be < 16" ); |
299 | |
300 | /// Data members |
301 | |
302 | // Read mostly by producers |
303 | alignas(Align) Atom<Weight> debit_; // written frequently only by producers |
304 | Atom<Weight> capacity_; // written rarely by capacity resets |
305 | |
306 | // Read mostly by consumers |
307 | alignas(Align) Atom<Weight> credit_; // written frequently only by consumers |
308 | Atom<Weight> threshold_; // written rarely only by capacity resets |
309 | |
310 | // Normally written and read rarely by producers and consumers |
311 | // May be read frequently by producers when capacity is full |
312 | alignas(Align) Atom<Weight> transfer_; |
313 | detail::Futex<Atom> waiting_; |
314 | |
315 | // Underlying unbounded queue |
316 | UnboundedQueue< |
317 | T, |
318 | SingleProducer, |
319 | SingleConsumer, |
320 | MayBlock, |
321 | LgSegmentSize, |
322 | LgAlign, |
323 | Atom> |
324 | q_; |
325 | |
326 | public: |
327 | /** constructor */ |
328 | explicit DynamicBoundedQueue(Weight capacity) |
329 | : debit_(0), |
330 | capacity_(capacity + threshold(capacity)), // capacity slack |
331 | credit_(0), |
332 | threshold_(threshold(capacity)), |
333 | transfer_(0), |
334 | waiting_(0) {} |
335 | |
336 | /** destructor */ |
337 | ~DynamicBoundedQueue() {} |
338 | |
339 | /// Enqueue functions |
340 | |
341 | /** enqueue */ |
342 | FOLLY_ALWAYS_INLINE void enqueue(const T& v) { |
343 | enqueueImpl(v); |
344 | } |
345 | |
346 | FOLLY_ALWAYS_INLINE void enqueue(T&& v) { |
347 | enqueueImpl(std::move(v)); |
348 | } |
349 | |
350 | /** try_enqueue */ |
351 | FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) { |
352 | return tryEnqueueImpl(v); |
353 | } |
354 | |
355 | FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) { |
356 | return tryEnqueueImpl(std::move(v)); |
357 | } |
358 | |
359 | /** try_enqueue_until */ |
360 | template <typename Clock, typename Duration> |
361 | FOLLY_ALWAYS_INLINE bool try_enqueue_until( |
362 | const T& v, |
363 | const std::chrono::time_point<Clock, Duration>& deadline) { |
364 | return tryEnqueueUntilImpl(v, deadline); |
365 | } |
366 | |
367 | template <typename Clock, typename Duration> |
368 | FOLLY_ALWAYS_INLINE bool try_enqueue_until( |
369 | T&& v, |
370 | const std::chrono::time_point<Clock, Duration>& deadline) { |
371 | return tryEnqueueUntilImpl(std::move(v), deadline); |
372 | } |
373 | |
374 | /** try_enqueue_for */ |
375 | template <typename Rep, typename Period> |
376 | FOLLY_ALWAYS_INLINE bool try_enqueue_for( |
377 | const T& v, |
378 | const std::chrono::duration<Rep, Period>& duration) { |
379 | return tryEnqueueForImpl(v, duration); |
380 | } |
381 | |
382 | template <typename Rep, typename Period> |
383 | FOLLY_ALWAYS_INLINE bool try_enqueue_for( |
384 | T&& v, |
385 | const std::chrono::duration<Rep, Period>& duration) { |
386 | return tryEnqueueForImpl(std::move(v), duration); |
387 | } |
388 | |
389 | /// Dequeue functions |
390 | |
391 | /** dequeue */ |
392 | FOLLY_ALWAYS_INLINE void dequeue(T& elem) { |
393 | q_.dequeue(elem); |
394 | addCredit(WeightFn()(elem)); |
395 | } |
396 | |
397 | /** try_dequeue */ |
398 | FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) { |
399 | if (q_.try_dequeue(elem)) { |
400 | addCredit(WeightFn()(elem)); |
401 | return true; |
402 | } |
403 | return false; |
404 | } |
405 | |
406 | /** try_dequeue_until */ |
407 | template <typename Clock, typename Duration> |
408 | FOLLY_ALWAYS_INLINE bool try_dequeue_until( |
409 | T& elem, |
410 | const std::chrono::time_point<Clock, Duration>& deadline) { |
411 | if (q_.try_dequeue_until(elem, deadline)) { |
412 | addCredit(WeightFn()(elem)); |
413 | return true; |
414 | } |
415 | return false; |
416 | } |
417 | |
418 | /** try_dequeue_for */ |
419 | template <typename Rep, typename Period> |
420 | FOLLY_ALWAYS_INLINE bool try_dequeue_for( |
421 | T& elem, |
422 | const std::chrono::duration<Rep, Period>& duration) { |
423 | if (q_.try_dequeue_for(elem, duration)) { |
424 | addCredit(WeightFn()(elem)); |
425 | return true; |
426 | } |
427 | return false; |
428 | } |
429 | |
430 | /// Secondary functions |
431 | |
432 | /** reset_capacity */ |
433 | void reset_capacity(Weight capacity) noexcept { |
434 | Weight thresh = threshold(capacity); |
435 | capacity_.store(capacity + thresh, std::memory_order_release); |
436 | threshold_.store(thresh, std::memory_order_release); |
437 | } |
438 | |
439 | /** weight */ |
440 | Weight weight() const noexcept { |
441 | auto d = getDebit(); |
442 | auto c = getCredit(); |
443 | auto t = getTransfer(); |
444 | return d > (c + t) ? d - (c + t) : 0; |
445 | } |
446 | |
447 | /** size */ |
448 | size_t size() const noexcept { |
449 | return q_.size(); |
450 | } |
451 | |
452 | /** empty */ |
453 | bool empty() const noexcept { |
454 | return q_.empty(); |
455 | } |
456 | |
457 | private: |
458 | /// Private functions /// |
459 | |
460 | // Calculation of threshold to move credits in bulk from consumers |
461 | // to producers |
462 | constexpr Weight threshold(Weight capacity) const noexcept { |
463 | return capacity / 10; |
464 | } |
465 | |
466 | // Functions called frequently by producers |
467 | |
468 | template <typename Arg> |
469 | FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) { |
470 | tryEnqueueUntilImpl( |
471 | std::forward<Arg>(v), std::chrono::steady_clock::time_point::max()); |
472 | } |
473 | |
474 | template <typename Arg> |
475 | FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) { |
476 | return tryEnqueueUntilImpl( |
477 | std::forward<Arg>(v), std::chrono::steady_clock::time_point::min()); |
478 | } |
479 | |
480 | template <typename Clock, typename Duration, typename Arg> |
481 | FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl( |
482 | Arg&& v, |
483 | const std::chrono::time_point<Clock, Duration>& deadline) { |
484 | Weight weight = WeightFn()(std::forward<Arg>(v)); |
485 | if (LIKELY(tryAddDebit(weight))) { |
486 | q_.enqueue(std::forward<Arg>(v)); |
487 | return true; |
488 | } |
489 | return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline); |
490 | } |
491 | |
492 | template <typename Rep, typename Period, typename Arg> |
493 | FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl( |
494 | Arg&& v, |
495 | const std::chrono::duration<Rep, Period>& duration) { |
496 | if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) { |
497 | return true; |
498 | } |
499 | auto deadline = std::chrono::steady_clock::now() + duration; |
500 | return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline); |
501 | } |
502 | |
503 | FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept { |
504 | Weight capacity = getCapacity(); |
505 | Weight before = fetchAddDebit(weight); |
506 | if (LIKELY(before + weight <= capacity)) { |
507 | return true; |
508 | } else { |
509 | subDebit(weight); |
510 | return false; |
511 | } |
512 | } |
513 | |
514 | FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept { |
515 | return capacity_.load(std::memory_order_acquire); |
516 | } |
517 | |
518 | FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept { |
519 | Weight before; |
520 | if (SingleProducer) { |
521 | before = getDebit(); |
522 | debit_.store(before + weight, std::memory_order_relaxed); |
523 | } else { |
524 | before = debit_.fetch_add(weight, std::memory_order_acq_rel); |
525 | } |
526 | return before; |
527 | } |
528 | |
529 | FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept { |
530 | return debit_.load(std::memory_order_acquire); |
531 | } |
532 | |
533 | // Functions called frequently by consumers |
534 | |
535 | FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept { |
536 | Weight before = fetchAddCredit(weight); |
537 | Weight thresh = getThreshold(); |
538 | if (before + weight >= thresh && before < thresh) { |
539 | transferCredit(); |
540 | } |
541 | } |
542 | |
543 | FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept { |
544 | Weight before; |
545 | if (SingleConsumer) { |
546 | before = getCredit(); |
547 | credit_.store(before + weight, std::memory_order_relaxed); |
548 | } else { |
549 | before = credit_.fetch_add(weight, std::memory_order_acq_rel); |
550 | } |
551 | return before; |
552 | } |
553 | |
554 | FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept { |
555 | return credit_.load(std::memory_order_acquire); |
556 | } |
557 | |
558 | FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept { |
559 | return threshold_.load(std::memory_order_acquire); |
560 | } |
561 | |
562 | /** Functions called infrequently by producers */ |
563 | |
564 | void subDebit(Weight weight) noexcept { |
565 | Weight before; |
566 | if (SingleProducer) { |
567 | before = getDebit(); |
568 | debit_.store(before - weight, std::memory_order_relaxed); |
569 | } else { |
570 | before = debit_.fetch_sub(weight, std::memory_order_acq_rel); |
571 | } |
572 | DCHECK_GE(before, weight); |
573 | } |
574 | |
575 | template <typename Clock, typename Duration, typename Arg> |
576 | bool tryEnqueueUntilSlow( |
577 | Arg&& v, |
578 | const std::chrono::time_point<Clock, Duration>& deadline) { |
579 | Weight weight = WeightFn()(std::forward<Arg>(v)); |
580 | if (canEnqueue(deadline, weight)) { |
581 | q_.enqueue(std::forward<Arg>(v)); |
582 | return true; |
583 | } else { |
584 | return false; |
585 | } |
586 | } |
587 | |
588 | template <typename Clock, typename Duration> |
589 | bool canEnqueue( |
590 | const std::chrono::time_point<Clock, Duration>& deadline, |
591 | Weight weight) noexcept { |
592 | Weight capacity = getCapacity(); |
593 | while (true) { |
594 | tryReduceDebit(); |
595 | Weight debit = getDebit(); |
596 | if ((debit + weight <= capacity) && tryAddDebit(weight)) { |
597 | return true; |
598 | } |
599 | if (deadline < Clock::time_point::max() && Clock::now() >= deadline) { |
600 | return false; |
601 | } |
602 | if (MayBlock) { |
603 | if (canBlock(weight, capacity)) { |
604 | detail::futexWaitUntil(&waiting_, WAITING, deadline); |
605 | } |
606 | } else { |
607 | asm_volatile_pause(); |
608 | } |
609 | } |
610 | } |
611 | |
612 | bool canBlock(Weight weight, Weight capacity) noexcept { |
613 | waiting_.store(WAITING, std::memory_order_relaxed); |
614 | std::atomic_thread_fence(std::memory_order_seq_cst); |
615 | tryReduceDebit(); |
616 | Weight debit = getDebit(); |
617 | return debit + weight > capacity; |
618 | } |
619 | |
620 | bool tryReduceDebit() noexcept { |
621 | Weight w = takeTransfer(); |
622 | if (w > 0) { |
623 | subDebit(w); |
624 | } |
625 | return w > 0; |
626 | } |
627 | |
628 | Weight takeTransfer() noexcept { |
629 | Weight w = getTransfer(); |
630 | if (w > 0) { |
631 | w = transfer_.exchange(0, std::memory_order_acq_rel); |
632 | } |
633 | return w; |
634 | } |
635 | |
636 | Weight getTransfer() const noexcept { |
637 | return transfer_.load(std::memory_order_acquire); |
638 | } |
639 | |
640 | /** Functions called infrequently by consumers */ |
641 | |
642 | void transferCredit() noexcept { |
643 | Weight credit = takeCredit(); |
644 | transfer_.fetch_add(credit, std::memory_order_acq_rel); |
645 | if (MayBlock) { |
646 | std::atomic_thread_fence(std::memory_order_seq_cst); |
647 | waiting_.store(NOTWAITING, std::memory_order_relaxed); |
648 | detail::futexWake(&waiting_); |
649 | } |
650 | } |
651 | |
652 | Weight takeCredit() noexcept { |
653 | Weight credit; |
654 | if (SingleConsumer) { |
655 | credit = credit_.load(std::memory_order_relaxed); |
656 | credit_.store(0, std::memory_order_relaxed); |
657 | } else { |
658 | credit = credit_.exchange(0, std::memory_order_acq_rel); |
659 | } |
660 | return credit; |
661 | } |
662 | |
663 | }; // DynamicBoundedQueue |
664 | |
665 | /// Aliases |
666 | |
667 | /** DSPSCQueue */ |
668 | template < |
669 | typename T, |
670 | bool MayBlock, |
671 | size_t LgSegmentSize = 8, |
672 | size_t LgAlign = 7, |
673 | typename WeightFn = DefaultWeightFn<T>, |
674 | template <typename> class Atom = std::atomic> |
675 | using DSPSCQueue = DynamicBoundedQueue< |
676 | T, |
677 | true, |
678 | true, |
679 | MayBlock, |
680 | LgSegmentSize, |
681 | LgAlign, |
682 | WeightFn, |
683 | Atom>; |
684 | |
685 | /** DMPSCQueue */ |
686 | template < |
687 | typename T, |
688 | bool MayBlock, |
689 | size_t LgSegmentSize = 8, |
690 | size_t LgAlign = 7, |
691 | typename WeightFn = DefaultWeightFn<T>, |
692 | template <typename> class Atom = std::atomic> |
693 | using DMPSCQueue = DynamicBoundedQueue< |
694 | T, |
695 | false, |
696 | true, |
697 | MayBlock, |
698 | LgSegmentSize, |
699 | LgAlign, |
700 | WeightFn, |
701 | Atom>; |
702 | |
703 | /** DSPMCQueue */ |
704 | template < |
705 | typename T, |
706 | bool MayBlock, |
707 | size_t LgSegmentSize = 8, |
708 | size_t LgAlign = 7, |
709 | typename WeightFn = DefaultWeightFn<T>, |
710 | template <typename> class Atom = std::atomic> |
711 | using DSPMCQueue = DynamicBoundedQueue< |
712 | T, |
713 | true, |
714 | false, |
715 | MayBlock, |
716 | LgSegmentSize, |
717 | LgAlign, |
718 | WeightFn, |
719 | Atom>; |
720 | |
721 | /** DMPMCQueue */ |
722 | template < |
723 | typename T, |
724 | bool MayBlock, |
725 | size_t LgSegmentSize = 8, |
726 | size_t LgAlign = 7, |
727 | typename WeightFn = DefaultWeightFn<T>, |
728 | template <typename> class Atom = std::atomic> |
729 | using DMPMCQueue = DynamicBoundedQueue< |
730 | T, |
731 | false, |
732 | false, |
733 | MayBlock, |
734 | LgSegmentSize, |
735 | LgAlign, |
736 | WeightFn, |
737 | Atom>; |
738 | |
739 | } // namespace folly |
740 | |