1 | /* |
2 | * Copyright 2012-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | // @author Bo Hu (bhu@fb.com) |
18 | // @author Jordan DeLong (delong.j@fb.com) |
19 | |
20 | #pragma once |
21 | |
22 | #include <atomic> |
23 | #include <cassert> |
24 | #include <cstdlib> |
25 | #include <memory> |
26 | #include <stdexcept> |
27 | #include <type_traits> |
28 | #include <utility> |
29 | |
30 | #include <folly/concurrency/CacheLocality.h> |
31 | |
32 | namespace folly { |
33 | |
34 | /* |
35 | * ProducerConsumerQueue is a one producer and one consumer queue |
36 | * without locks. |
37 | */ |
38 | template <class T> |
39 | struct ProducerConsumerQueue { |
40 | typedef T value_type; |
41 | |
42 | ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; |
43 | ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete; |
44 | |
45 | // size must be >= 2. |
46 | // |
47 | // Also, note that the number of usable slots in the queue at any |
48 | // given time is actually (size-1), so if you start with an empty queue, |
49 | // isFull() will return true after size-1 insertions. |
50 | explicit ProducerConsumerQueue(uint32_t size) |
51 | : size_(size), |
52 | records_(static_cast<T*>(std::malloc(sizeof(T) * size))), |
53 | readIndex_(0), |
54 | writeIndex_(0) { |
55 | assert(size >= 2); |
56 | if (!records_) { |
57 | throw std::bad_alloc(); |
58 | } |
59 | } |
60 | |
61 | ~ProducerConsumerQueue() { |
62 | // We need to destruct anything that may still exist in our queue. |
63 | // (No real synchronization needed at destructor time: only one |
64 | // thread can be doing this.) |
65 | if (!std::is_trivially_destructible<T>::value) { |
66 | size_t readIndex = readIndex_; |
67 | size_t endIndex = writeIndex_; |
68 | while (readIndex != endIndex) { |
69 | records_[readIndex].~T(); |
70 | if (++readIndex == size_) { |
71 | readIndex = 0; |
72 | } |
73 | } |
74 | } |
75 | |
76 | std::free(records_); |
77 | } |
78 | |
79 | template <class... Args> |
80 | bool write(Args&&... recordArgs) { |
81 | auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); |
82 | auto nextRecord = currentWrite + 1; |
83 | if (nextRecord == size_) { |
84 | nextRecord = 0; |
85 | } |
86 | if (nextRecord != readIndex_.load(std::memory_order_acquire)) { |
87 | new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...); |
88 | writeIndex_.store(nextRecord, std::memory_order_release); |
89 | return true; |
90 | } |
91 | |
92 | // queue is full |
93 | return false; |
94 | } |
95 | |
96 | // move (or copy) the value at the front of the queue to given variable |
97 | bool read(T& record) { |
98 | auto const currentRead = readIndex_.load(std::memory_order_relaxed); |
99 | if (currentRead == writeIndex_.load(std::memory_order_acquire)) { |
100 | // queue is empty |
101 | return false; |
102 | } |
103 | |
104 | auto nextRecord = currentRead + 1; |
105 | if (nextRecord == size_) { |
106 | nextRecord = 0; |
107 | } |
108 | record = std::move(records_[currentRead]); |
109 | records_[currentRead].~T(); |
110 | readIndex_.store(nextRecord, std::memory_order_release); |
111 | return true; |
112 | } |
113 | |
114 | // pointer to the value at the front of the queue (for use in-place) or |
115 | // nullptr if empty. |
116 | T* frontPtr() { |
117 | auto const currentRead = readIndex_.load(std::memory_order_relaxed); |
118 | if (currentRead == writeIndex_.load(std::memory_order_acquire)) { |
119 | // queue is empty |
120 | return nullptr; |
121 | } |
122 | return &records_[currentRead]; |
123 | } |
124 | |
125 | // queue must not be empty |
126 | void popFront() { |
127 | auto const currentRead = readIndex_.load(std::memory_order_relaxed); |
128 | assert(currentRead != writeIndex_.load(std::memory_order_acquire)); |
129 | |
130 | auto nextRecord = currentRead + 1; |
131 | if (nextRecord == size_) { |
132 | nextRecord = 0; |
133 | } |
134 | records_[currentRead].~T(); |
135 | readIndex_.store(nextRecord, std::memory_order_release); |
136 | } |
137 | |
138 | bool isEmpty() const { |
139 | return readIndex_.load(std::memory_order_acquire) == |
140 | writeIndex_.load(std::memory_order_acquire); |
141 | } |
142 | |
143 | bool isFull() const { |
144 | auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; |
145 | if (nextRecord == size_) { |
146 | nextRecord = 0; |
147 | } |
148 | if (nextRecord != readIndex_.load(std::memory_order_acquire)) { |
149 | return false; |
150 | } |
151 | // queue is full |
152 | return true; |
153 | } |
154 | |
155 | // * If called by consumer, then true size may be more (because producer may |
156 | // be adding items concurrently). |
157 | // * If called by producer, then true size may be less (because consumer may |
158 | // be removing items concurrently). |
159 | // * It is undefined to call this from any other thread. |
160 | size_t sizeGuess() const { |
161 | int ret = writeIndex_.load(std::memory_order_acquire) - |
162 | readIndex_.load(std::memory_order_acquire); |
163 | if (ret < 0) { |
164 | ret += size_; |
165 | } |
166 | return ret; |
167 | } |
168 | |
169 | // maximum number of items in the queue. |
170 | size_t capacity() const { |
171 | return size_ - 1; |
172 | } |
173 | |
174 | private: |
175 | using AtomicIndex = std::atomic<unsigned int>; |
176 | |
177 | char pad0_[hardware_destructive_interference_size]; |
178 | const uint32_t size_; |
179 | T* const records_; |
180 | |
181 | alignas(hardware_destructive_interference_size) AtomicIndex readIndex_; |
182 | alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_; |
183 | |
184 | char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)]; |
185 | }; |
186 | |
187 | } // namespace folly |
188 | |