1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/Executor.h>
20#include <folly/MPMCQueue.h>
21#include <folly/Range.h>
22#include <folly/executors/task_queue/BlockingQueue.h>
23#include <folly/synchronization/LifoSem.h>
24#include <glog/logging.h>
25
26namespace folly {
27
28template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
29class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
30 public:
31 // Note A: The queue pre-allocates all memory for max_capacity
32 // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
33 // MID_PRI and HI_PRI are treated at the same priority level.
34 PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
35 queues_.reserve(numPriorities);
36 for (int8_t i = 0; i < numPriorities; i++) {
37 queues_.emplace_back(max_capacity);
38 }
39 }
40
41 PriorityLifoSemMPMCQueue(folly::Range<const size_t*> capacities) {
42 CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported";
43
44 queues_.reserve(capacities.size());
45 for (auto capacity : capacities) {
46 queues_.emplace_back(capacity);
47 }
48 }
49
50 uint8_t getNumPriorities() override {
51 return queues_.size();
52 }
53
54 // Add at medium priority by default
55 BlockingQueueAddResult add(T item) override {
56 return addWithPriority(std::move(item), folly::Executor::MID_PRI);
57 }
58
59 BlockingQueueAddResult addWithPriority(T item, int8_t priority) override {
60 int mid = getNumPriorities() / 2;
61 size_t queue = priority < 0
62 ? std::max(0, mid + priority)
63 : std::min(getNumPriorities() - 1, mid + priority);
64 CHECK_LT(queue, queues_.size());
65 switch (kBehavior) { // static
66 case QueueBehaviorIfFull::THROW:
67 if (!queues_[queue].write(std::move(item))) {
68 throw QueueFullException("LifoSemMPMCQueue full, can't add item");
69 }
70 break;
71 case QueueBehaviorIfFull::BLOCK:
72 queues_[queue].blockingWrite(std::move(item));
73 break;
74 }
75 return sem_.post();
76 }
77
78 T take() override {
79 T item;
80 while (true) {
81 if (nonBlockingTake(item)) {
82 return item;
83 }
84 sem_.wait();
85 }
86 }
87
88 folly::Optional<T> try_take_for(std::chrono::milliseconds time) override {
89 T item;
90 while (true) {
91 if (nonBlockingTake(item)) {
92 return std::move(item);
93 }
94 if (!sem_.try_wait_for(time)) {
95 return folly::none;
96 }
97 }
98 }
99
100 bool nonBlockingTake(T& item) {
101 for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
102 if (it->readIfNotEmpty(item)) {
103 return true;
104 }
105 }
106 return false;
107 }
108
109 size_t size() override {
110 size_t size = 0;
111 for (auto& q : queues_) {
112 size += q.size();
113 }
114 return size;
115 }
116
117 size_t sizeGuess() const {
118 size_t size = 0;
119 for (auto& q : queues_) {
120 size += q.sizeGuess();
121 }
122 return size;
123 }
124
125 private:
126 folly::LifoSem sem_;
127 std::vector<folly::MPMCQueue<T>> queues_;
128};
129
130} // namespace folly
131