1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/Executor.h> |
20 | #include <folly/MPMCQueue.h> |
21 | #include <folly/Range.h> |
22 | #include <folly/executors/task_queue/BlockingQueue.h> |
23 | #include <folly/synchronization/LifoSem.h> |
24 | #include <glog/logging.h> |
25 | |
26 | namespace folly { |
27 | |
28 | template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW> |
29 | class PriorityLifoSemMPMCQueue : public BlockingQueue<T> { |
30 | public: |
31 | // Note A: The queue pre-allocates all memory for max_capacity |
32 | // Note B: To use folly::Executor::*_PRI, for numPriorities == 2 |
33 | // MID_PRI and HI_PRI are treated at the same priority level. |
34 | PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) { |
35 | queues_.reserve(numPriorities); |
36 | for (int8_t i = 0; i < numPriorities; i++) { |
37 | queues_.emplace_back(max_capacity); |
38 | } |
39 | } |
40 | |
41 | PriorityLifoSemMPMCQueue(folly::Range<const size_t*> capacities) { |
42 | CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported" ; |
43 | |
44 | queues_.reserve(capacities.size()); |
45 | for (auto capacity : capacities) { |
46 | queues_.emplace_back(capacity); |
47 | } |
48 | } |
49 | |
50 | uint8_t getNumPriorities() override { |
51 | return queues_.size(); |
52 | } |
53 | |
54 | // Add at medium priority by default |
55 | BlockingQueueAddResult add(T item) override { |
56 | return addWithPriority(std::move(item), folly::Executor::MID_PRI); |
57 | } |
58 | |
59 | BlockingQueueAddResult addWithPriority(T item, int8_t priority) override { |
60 | int mid = getNumPriorities() / 2; |
61 | size_t queue = priority < 0 |
62 | ? std::max(0, mid + priority) |
63 | : std::min(getNumPriorities() - 1, mid + priority); |
64 | CHECK_LT(queue, queues_.size()); |
65 | switch (kBehavior) { // static |
66 | case QueueBehaviorIfFull::THROW: |
67 | if (!queues_[queue].write(std::move(item))) { |
68 | throw QueueFullException("LifoSemMPMCQueue full, can't add item" ); |
69 | } |
70 | break; |
71 | case QueueBehaviorIfFull::BLOCK: |
72 | queues_[queue].blockingWrite(std::move(item)); |
73 | break; |
74 | } |
75 | return sem_.post(); |
76 | } |
77 | |
78 | T take() override { |
79 | T item; |
80 | while (true) { |
81 | if (nonBlockingTake(item)) { |
82 | return item; |
83 | } |
84 | sem_.wait(); |
85 | } |
86 | } |
87 | |
88 | folly::Optional<T> try_take_for(std::chrono::milliseconds time) override { |
89 | T item; |
90 | while (true) { |
91 | if (nonBlockingTake(item)) { |
92 | return std::move(item); |
93 | } |
94 | if (!sem_.try_wait_for(time)) { |
95 | return folly::none; |
96 | } |
97 | } |
98 | } |
99 | |
100 | bool nonBlockingTake(T& item) { |
101 | for (auto it = queues_.rbegin(); it != queues_.rend(); it++) { |
102 | if (it->readIfNotEmpty(item)) { |
103 | return true; |
104 | } |
105 | } |
106 | return false; |
107 | } |
108 | |
109 | size_t size() override { |
110 | size_t size = 0; |
111 | for (auto& q : queues_) { |
112 | size += q.size(); |
113 | } |
114 | return size; |
115 | } |
116 | |
117 | size_t sizeGuess() const { |
118 | size_t size = 0; |
119 | for (auto& q : queues_) { |
120 | size += q.sizeGuess(); |
121 | } |
122 | return size; |
123 | } |
124 | |
125 | private: |
126 | folly::LifoSem sem_; |
127 | std::vector<folly::MPMCQueue<T>> queues_; |
128 | }; |
129 | |
130 | } // namespace folly |
131 | |