1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <iostream> |
18 | #include <thread> |
19 | |
20 | #include <folly/experimental/LockFreeRingBuffer.h> |
21 | #include <folly/portability/GTest.h> |
22 | #include <folly/test/DeterministicSchedule.h> |
23 | |
24 | namespace folly { |
25 | |
26 | TEST(LockFreeRingBuffer, writeReadSequentially) { |
27 | const int capacity = 256; |
28 | const int turns = 4; |
29 | |
30 | LockFreeRingBuffer<int> rb(capacity); |
31 | LockFreeRingBuffer<int>::Cursor cur = rb.currentHead(); |
32 | for (unsigned int turn = 0; turn < turns; turn++) { |
33 | for (unsigned int write = 0; write < capacity; write++) { |
34 | int val = turn * capacity + write; |
35 | rb.write(val); |
36 | } |
37 | |
38 | for (unsigned int write = 0; write < capacity; write++) { |
39 | int dest = 0; |
40 | ASSERT_TRUE(rb.tryRead(dest, cur)); |
41 | ASSERT_EQ(turn * capacity + write, dest); |
42 | cur.moveForward(); |
43 | } |
44 | } |
45 | } |
46 | |
47 | TEST(LockFreeRingBuffer, writeReadSequentiallyBackward) { |
48 | const int capacity = 256; |
49 | const int turns = 4; |
50 | |
51 | LockFreeRingBuffer<int> rb(capacity); |
52 | for (unsigned int turn = 0; turn < turns; turn++) { |
53 | for (unsigned int write = 0; write < capacity; write++) { |
54 | int val = turn * capacity + write; |
55 | rb.write(val); |
56 | } |
57 | |
58 | LockFreeRingBuffer<int>::Cursor cur = rb.currentHead(); |
59 | cur.moveBackward(1); /// last write |
60 | for (int write = capacity - 1; write >= 0; write--) { |
61 | int foo = 0; |
62 | ASSERT_TRUE(rb.tryRead(foo, cur)); |
63 | ASSERT_EQ(turn * capacity + write, foo); |
64 | cur.moveBackward(); |
65 | } |
66 | } |
67 | } |
68 | |
69 | TEST(LockFreeRingBuffer, readsCanBlock) { |
70 | // Start a reader thread, confirm that reading can block |
71 | std::atomic<bool> readerHasRun(false); |
72 | LockFreeRingBuffer<int> rb(1); |
73 | auto cursor = rb.currentHead(); |
74 | cursor.moveForward(3); // wait for the 4th write |
75 | |
76 | const int sentinel = 0xfaceb00c; |
77 | |
78 | auto reader = std::thread([&]() { |
79 | int val = 0; |
80 | EXPECT_TRUE(rb.waitAndTryRead(val, cursor)); |
81 | readerHasRun = true; |
82 | EXPECT_EQ(sentinel, val); |
83 | }); |
84 | |
85 | for (int i = 0; i < 4; i++) { |
86 | EXPECT_FALSE(readerHasRun); |
87 | int val = sentinel; |
88 | rb.write(val); |
89 | } |
90 | reader.join(); |
91 | EXPECT_TRUE(readerHasRun); |
92 | } |
93 | |
94 | // expose the cursor raw value via a wrapper type |
95 | template <typename T, template <typename> class Atom> |
96 | uint64_t value(const typename LockFreeRingBuffer<T, Atom>::Cursor& rbcursor) { |
97 | typedef typename LockFreeRingBuffer<T, Atom>::Cursor RBCursor; |
98 | |
99 | struct ExposedCursor : RBCursor { |
100 | ExposedCursor(const RBCursor& cursor) : RBCursor(cursor) {} |
101 | uint64_t value() { |
102 | return this->ticket; |
103 | } |
104 | }; |
105 | return ExposedCursor(rbcursor).value(); |
106 | } |
107 | |
108 | template <template <typename> class Atom> |
109 | void runReader( |
110 | LockFreeRingBuffer<int, Atom>& rb, |
111 | std::atomic<int32_t>& writes) { |
112 | int32_t idx; |
113 | while ((idx = writes--) > 0) { |
114 | rb.write(idx); |
115 | } |
116 | } |
117 | |
118 | template <template <typename> class Atom> |
119 | void runWritesNeverFail(int capacity, int writes, int writers) { |
120 | using folly::test::DeterministicSchedule; |
121 | |
122 | DeterministicSchedule sched(DeterministicSchedule::uniform(0)); |
123 | LockFreeRingBuffer<int, Atom> rb(capacity); |
124 | |
125 | std::atomic<int32_t> writes_remaining(writes); |
126 | std::vector<std::thread> threads(writers); |
127 | |
128 | for (int i = 0; i < writers; i++) { |
129 | threads[i] = DeterministicSchedule::thread( |
130 | std::bind(runReader<Atom>, std::ref(rb), std::ref(writes_remaining))); |
131 | } |
132 | |
133 | for (auto& thread : threads) { |
134 | DeterministicSchedule::join(thread); |
135 | } |
136 | |
137 | EXPECT_EQ(writes, (value<int, Atom>)(rb.currentHead())); |
138 | } |
139 | |
140 | TEST(LockFreeRingBuffer, writesNeverFail) { |
141 | using folly::detail::EmulatedFutexAtomic; |
142 | using folly::test::DeterministicAtomic; |
143 | |
144 | runWritesNeverFail<DeterministicAtomic>(1, 100, 4); |
145 | runWritesNeverFail<DeterministicAtomic>(10, 100, 4); |
146 | runWritesNeverFail<DeterministicAtomic>(100, 1000, 8); |
147 | runWritesNeverFail<DeterministicAtomic>(1000, 10000, 16); |
148 | |
149 | runWritesNeverFail<std::atomic>(1, 100, 4); |
150 | runWritesNeverFail<std::atomic>(10, 100, 4); |
151 | runWritesNeverFail<std::atomic>(100, 1000, 8); |
152 | runWritesNeverFail<std::atomic>(1000, 10000, 16); |
153 | |
154 | runWritesNeverFail<EmulatedFutexAtomic>(1, 100, 4); |
155 | runWritesNeverFail<EmulatedFutexAtomic>(10, 100, 4); |
156 | runWritesNeverFail<EmulatedFutexAtomic>(100, 1000, 8); |
157 | runWritesNeverFail<EmulatedFutexAtomic>(1000, 10000, 16); |
158 | } |
159 | |
160 | TEST(LockFreeRingBuffer, readerCanDetectSkips) { |
161 | const int capacity = 4; |
162 | const int rounds = 4; |
163 | |
164 | LockFreeRingBuffer<int> rb(capacity); |
165 | auto cursor = rb.currentHead(); |
166 | cursor.moveForward(1); |
167 | |
168 | for (int round = 0; round < rounds; round++) { |
169 | for (int i = 0; i < capacity; i++) { |
170 | int val = round * capacity + i; |
171 | rb.write(val); |
172 | } |
173 | } |
174 | |
175 | int result = -1; |
176 | EXPECT_FALSE(rb.tryRead(result, cursor)); |
177 | EXPECT_FALSE(rb.waitAndTryRead(result, cursor)); |
178 | EXPECT_EQ(-1, result); |
179 | |
180 | cursor = rb.currentTail(); |
181 | EXPECT_TRUE(rb.tryRead(result, cursor)); |
182 | EXPECT_EQ(capacity * (rounds - 1), result); |
183 | |
184 | cursor = rb.currentTail(1.0); |
185 | EXPECT_TRUE(rb.tryRead(result, cursor)); |
186 | EXPECT_EQ((capacity * rounds) - 1, result); |
187 | } |
188 | |
189 | TEST(LockFreeRingBuffer, currentTailRange) { |
190 | const int capacity = 4; |
191 | LockFreeRingBuffer<int> rb(capacity); |
192 | |
193 | // Workaround for template deduction failure |
194 | auto (&cursorValue)(value<int, std::atomic>); |
195 | |
196 | // Empty buffer - everything points to 0 |
197 | EXPECT_EQ(0, cursorValue(rb.currentTail(0))); |
198 | EXPECT_EQ(0, cursorValue(rb.currentTail(0.5))); |
199 | EXPECT_EQ(0, cursorValue(rb.currentTail(1))); |
200 | |
201 | // Half-full |
202 | int val = 5; |
203 | rb.write(val); |
204 | rb.write(val); |
205 | |
206 | EXPECT_EQ(0, cursorValue(rb.currentTail(0))); |
207 | EXPECT_EQ(1, cursorValue(rb.currentTail(1))); |
208 | |
209 | // Full |
210 | rb.write(val); |
211 | rb.write(val); |
212 | |
213 | EXPECT_EQ(0, cursorValue(rb.currentTail(0))); |
214 | EXPECT_EQ(3, cursorValue(rb.currentTail(1))); |
215 | |
216 | auto midvalue = cursorValue(rb.currentTail(0.5)); |
217 | // both rounding behaviours are acceptable |
218 | EXPECT_TRUE(midvalue == 1 || midvalue == 2); |
219 | } |
220 | |
221 | TEST(LockFreeRingBuffer, cursorFromWrites) { |
222 | const int capacity = 3; |
223 | LockFreeRingBuffer<int> rb(capacity); |
224 | |
225 | // Workaround for template deduction failure |
226 | auto (&cursorValue)(value<int, std::atomic>); |
227 | |
228 | int val = 0xfaceb00c; |
229 | EXPECT_EQ(0, cursorValue(rb.writeAndGetCursor(val))); |
230 | EXPECT_EQ(1, cursorValue(rb.writeAndGetCursor(val))); |
231 | EXPECT_EQ(2, cursorValue(rb.writeAndGetCursor(val))); |
232 | |
233 | // Check that rb is giving out actual cursors and not just |
234 | // pointing to the current slot. |
235 | EXPECT_EQ(3, cursorValue(rb.writeAndGetCursor(val))); |
236 | } |
237 | |
238 | TEST(LockFreeRingBuffer, moveBackwardsCanFail) { |
239 | const int capacity = 3; |
240 | LockFreeRingBuffer<int> rb(capacity); |
241 | |
242 | // Workaround for template deduction failure |
243 | auto (&cursorValue)(value<int, std::atomic>); |
244 | |
245 | int val = 0xfaceb00c; |
246 | rb.write(val); |
247 | rb.write(val); |
248 | |
249 | auto cursor = rb.currentHead(); // points to 2 |
250 | EXPECT_EQ(2, cursorValue(cursor)); |
251 | EXPECT_TRUE(cursor.moveBackward()); |
252 | EXPECT_TRUE(cursor.moveBackward()); // now at 0 |
253 | EXPECT_FALSE(cursor.moveBackward()); // moving back does nothing |
254 | } |
255 | |
256 | TEST(LockFreeRingBuffer, writeReadDifferentType) { |
257 | struct FixedBuffer { |
258 | char data_[1024]; |
259 | |
260 | FixedBuffer() noexcept { |
261 | data_[0] = '\0'; |
262 | } |
263 | |
264 | FixedBuffer& operator=(std::string&& data) { |
265 | strncpy(data_, data.c_str(), sizeof(data_) - 1); |
266 | |
267 | return (*this); |
268 | } |
269 | }; |
270 | |
271 | struct StringBuffer { |
272 | char data_[1024]; |
273 | |
274 | StringBuffer() noexcept { |
275 | data_[0] = '\0'; |
276 | } |
277 | |
278 | StringBuffer& operator=(FixedBuffer& data) { |
279 | strncpy(data_, data.data_, sizeof(data_) - 1); |
280 | |
281 | return (*this); |
282 | } |
283 | }; |
284 | |
285 | std::string str("Test" ); |
286 | |
287 | const int capacity = 3; |
288 | LockFreeRingBuffer<FixedBuffer> rb(capacity); |
289 | rb.write(str); |
290 | |
291 | auto cursor = rb.currentTail(); |
292 | StringBuffer result; |
293 | EXPECT_TRUE(rb.tryRead(result, cursor)); |
294 | |
295 | EXPECT_EQ(str, result.data_); |
296 | } |
297 | |
298 | } // namespace folly |
299 | |