1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/synchronization/LifoSem.h>
18
19#include <thread>
20
21#include <folly/Benchmark.h>
22#include <folly/Random.h>
23#include <folly/portability/Asm.h>
24#include <folly/portability/GFlags.h>
25#include <folly/portability/GTest.h>
26#include <folly/portability/Semaphore.h>
27#include <folly/test/DeterministicSchedule.h>
28
29using namespace folly;
30using namespace folly::test;
31
32typedef LifoSemImpl<DeterministicAtomic> DLifoSem;
33typedef DeterministicSchedule DSched;
34
35LIFOSEM_DECLARE_POOL(DeterministicAtomic, 100000)
36
37class LifoSemTest : public testing::Test {
38 private:
39 // pre-init the pool to avoid deadlock when using DeterministicAtomic
40 using Node = detail::LifoSemRawNode<DeterministicAtomic>;
41 Node::Pool& pool_{Node::pool()};
42};
43
44TEST(LifoSem, basic) {
45 LifoSem sem;
46 EXPECT_FALSE(sem.tryWait());
47 sem.post();
48 EXPECT_TRUE(sem.tryWait());
49 sem.post();
50 sem.wait();
51}
52
53TEST(LifoSem, multi) {
54 LifoSem sem;
55
56 const int opsPerThread = 10000;
57 std::thread threads[10];
58 std::atomic<int> blocks(0);
59
60 for (auto& thr : threads) {
61 thr = std::thread([&] {
62 int b = 0;
63 for (int i = 0; i < opsPerThread; ++i) {
64 if (!sem.tryWait()) {
65 sem.wait();
66 ++b;
67 }
68 sem.post();
69 }
70 blocks += b;
71 });
72 }
73
74 // start the flood
75 sem.post();
76
77 for (auto& thr : threads) {
78 thr.join();
79 }
80
81 LOG(INFO) << opsPerThread * sizeof(threads) / sizeof(threads[0])
82 << " post/wait pairs, " << blocks << " blocked";
83}
84
85TEST_F(LifoSemTest, pingpong) {
86 DSched sched(DSched::uniform(0));
87
88 const int iters = 100;
89
90 for (int pass = 0; pass < 10; ++pass) {
91 DLifoSem a;
92 DLifoSem b;
93
94 auto thr = DSched::thread([&] {
95 for (int i = 0; i < iters; ++i) {
96 a.wait();
97 // main thread can't be running here
98 EXPECT_EQ(a.valueGuess(), 0);
99 EXPECT_EQ(b.valueGuess(), 0);
100 b.post();
101 }
102 });
103 for (int i = 0; i < iters; ++i) {
104 a.post();
105 b.wait();
106 // child thread can't be running here
107 EXPECT_EQ(a.valueGuess(), 0);
108 EXPECT_EQ(b.valueGuess(), 0);
109 }
110 DSched::join(thr);
111 }
112}
113
114TEST_F(LifoSemTest, mutex) {
115 DSched sched(DSched::uniform(0));
116
117 const int iters = 100;
118
119 for (int pass = 0; pass < 10; ++pass) {
120 DLifoSem a;
121
122 auto thr = DSched::thread([&] {
123 for (int i = 0; i < iters; ++i) {
124 a.wait();
125 a.post();
126 }
127 });
128 for (int i = 0; i < iters; ++i) {
129 a.post();
130 a.wait();
131 }
132 a.post();
133 DSched::join(thr);
134 a.wait();
135 }
136}
137
138TEST_F(LifoSemTest, no_blocking) {
139 long seed = folly::randomNumberSeed() % 10000;
140 LOG(INFO) << "seed=" << seed;
141 DSched sched(DSched::uniform(seed));
142
143 const int iters = 100;
144 const int numThreads = 2;
145 const int width = 10;
146
147 for (int pass = 0; pass < 10; ++pass) {
148 DLifoSem a;
149
150 std::vector<std::thread> threads;
151 while (threads.size() < numThreads) {
152 threads.emplace_back(DSched::thread([&] {
153 for (int i = 0; i < iters; ++i) {
154 a.post(width);
155 for (int w = 0; w < width; ++w) {
156 a.wait();
157 }
158 }
159 }));
160 }
161 for (auto& thr : threads) {
162 DSched::join(thr);
163 }
164 }
165}
166
167TEST_F(LifoSemTest, one_way) {
168 long seed = folly::randomNumberSeed() % 10000;
169 LOG(INFO) << "seed=" << seed;
170 DSched sched(DSched::uniformSubset(seed, 1, 6));
171
172 const int iters = 1000;
173
174 for (int pass = 0; pass < 10; ++pass) {
175 DLifoSem a;
176
177 auto thr = DSched::thread([&] {
178 for (int i = 0; i < iters; ++i) {
179 a.wait();
180 }
181 });
182 for (int i = 0; i < iters; ++i) {
183 a.post();
184 }
185 DSched::join(thr);
186 }
187}
188
189TEST_F(LifoSemTest, shutdown_wait_order) {
190 DLifoSem a;
191 a.shutdown();
192 a.post();
193 a.wait();
194 EXPECT_THROW(a.wait(), ShutdownSemError);
195 EXPECT_TRUE(a.isShutdown());
196}
197
198TEST_F(LifoSemTest, shutdown_multi) {
199 DSched sched(DSched::uniform(0));
200
201 for (int pass = 0; pass < 10; ++pass) {
202 DLifoSem a;
203 std::vector<std::thread> threads;
204 while (threads.size() < 20) {
205 threads.push_back(DSched::thread([&] {
206 try {
207 a.wait();
208 ADD_FAILURE();
209 } catch (ShutdownSemError&) {
210 // expected
211 EXPECT_TRUE(a.isShutdown());
212 }
213 }));
214 }
215 a.shutdown();
216 for (auto& thr : threads) {
217 DSched::join(thr);
218 }
219 }
220}
221
222TEST(LifoSem, multi_try_wait_simple) {
223 LifoSem sem;
224 sem.post(5);
225 auto n = sem.tryWait(10); // this used to trigger an assert
226 ASSERT_EQ(5, n);
227}
228
229TEST_F(LifoSemTest, multi_try_wait) {
230 long seed = folly::randomNumberSeed() % 10000;
231 LOG(INFO) << "seed=" << seed;
232 DSched sched(DSched::uniform(seed));
233 DLifoSem sem;
234
235 const int NPOSTS = 1000;
236
237 auto producer = [&] {
238 for (int i = 0; i < NPOSTS; ++i) {
239 sem.post();
240 }
241 };
242
243 DeterministicAtomic<bool> consumer_stop(false);
244 int consumed = 0;
245
246 auto consumer = [&] {
247 bool stop;
248 do {
249 stop = consumer_stop.load();
250 int n;
251 do {
252 n = sem.tryWait(10);
253 consumed += n;
254 } while (n > 0);
255 } while (!stop);
256 };
257
258 std::thread producer_thread(DSched::thread(producer));
259 std::thread consumer_thread(DSched::thread(consumer));
260 DSched::join(producer_thread);
261 consumer_stop.store(true);
262 DSched::join(consumer_thread);
263
264 ASSERT_EQ(NPOSTS, consumed);
265}
266
267TEST_F(LifoSemTest, timeout) {
268 long seed = folly::randomNumberSeed() % 10000;
269 LOG(INFO) << "seed=" << seed;
270 DSched sched(DSched::uniform(seed));
271 DeterministicAtomic<uint32_t> handoffs{0};
272
273 for (int pass = 0; pass < 10; ++pass) {
274 DLifoSem a;
275 std::vector<std::thread> threads;
276 while (threads.size() < 20) {
277 threads.push_back(DSched::thread([&] {
278 for (int i = 0; i < 10; i++) {
279 try {
280 if (a.try_wait_for(std::chrono::milliseconds(1))) {
281 handoffs--;
282 }
283 } catch (ShutdownSemError&) {
284 // expected
285 EXPECT_TRUE(a.isShutdown());
286 }
287 }
288 }));
289 }
290 std::vector<std::thread> threads2;
291 while (threads2.size() < 20) {
292 threads2.push_back(DSched::thread([&] {
293 for (int i = 0; i < 10; i++) {
294 a.post();
295 handoffs++;
296 }
297 }));
298 }
299 if (pass > 5) {
300 a.shutdown();
301 }
302 for (auto& thr : threads) {
303 DSched::join(thr);
304 }
305 for (auto& thr : threads2) {
306 DSched::join(thr);
307 }
308 // At least one timeout must occur.
309 EXPECT_GT(handoffs.load(), 0);
310 }
311}
312
313TEST_F(LifoSemTest, shutdown_try_wait_for) {
314 long seed = folly::randomNumberSeed() % 1000000;
315 LOG(INFO) << "seed=" << seed;
316 DSched sched(DSched::uniform(seed));
317
318 DLifoSem stopped;
319 std::thread worker1 = DSched::thread([&stopped] {
320 while (!stopped.isShutdown()) {
321 // i.e. poll for messages with timeout
322 LOG(INFO) << "thread polled";
323 }
324 });
325 std::thread worker2 = DSched::thread([&stopped] {
326 while (!stopped.isShutdown()) {
327 // Do some work every 1 second
328
329 try {
330 // this is normally 1 second in prod use case.
331 stopped.try_wait_for(std::chrono::milliseconds(1));
332 } catch (folly::ShutdownSemError& e) {
333 LOG(INFO) << "try_wait_for shutdown";
334 }
335 }
336 });
337
338 std::thread shutdown = DSched::thread([&stopped] {
339 LOG(INFO) << "LifoSem shutdown";
340 stopped.shutdown();
341 LOG(INFO) << "LifoSem shutdown done";
342 });
343
344 DSched::join(shutdown);
345 DSched::join(worker1);
346 DSched::join(worker2);
347 LOG(INFO) << "Threads joined";
348}
349
350BENCHMARK(lifo_sem_pingpong, iters) {
351 LifoSem a;
352 LifoSem b;
353 auto thr = std::thread([&] {
354 for (size_t i = 0; i < iters; ++i) {
355 a.wait();
356 b.post();
357 }
358 });
359 for (size_t i = 0; i < iters; ++i) {
360 a.post();
361 b.wait();
362 }
363 thr.join();
364}
365
366BENCHMARK(lifo_sem_oneway, iters) {
367 LifoSem a;
368 auto thr = std::thread([&] {
369 for (size_t i = 0; i < iters; ++i) {
370 a.wait();
371 }
372 });
373 for (size_t i = 0; i < iters; ++i) {
374 a.post();
375 }
376 thr.join();
377}
378
379BENCHMARK(single_thread_lifo_post, iters) {
380 LifoSem sem;
381 for (size_t n = 0; n < iters; ++n) {
382 sem.post();
383 asm_volatile_memory();
384 }
385}
386
387BENCHMARK(single_thread_lifo_wait, iters) {
388 LifoSem sem(iters);
389 for (size_t n = 0; n < iters; ++n) {
390 sem.wait();
391 asm_volatile_memory();
392 }
393}
394
395BENCHMARK(single_thread_lifo_postwait, iters) {
396 LifoSem sem;
397 for (size_t n = 0; n < iters; ++n) {
398 sem.post();
399 asm_volatile_memory();
400 sem.wait();
401 asm_volatile_memory();
402 }
403}
404
405BENCHMARK(single_thread_lifo_trywait, iters) {
406 LifoSem sem;
407 for (size_t n = 0; n < iters; ++n) {
408 EXPECT_FALSE(sem.tryWait());
409 asm_volatile_memory();
410 }
411}
412
413BENCHMARK(single_thread_posix_postwait, iters) {
414 sem_t sem;
415 EXPECT_EQ(sem_init(&sem, 0, 0), 0);
416 for (size_t n = 0; n < iters; ++n) {
417 EXPECT_EQ(sem_post(&sem), 0);
418 EXPECT_EQ(sem_wait(&sem), 0);
419 }
420 EXPECT_EQ(sem_destroy(&sem), 0);
421}
422
423BENCHMARK(single_thread_posix_trywait, iters) {
424 sem_t sem;
425 EXPECT_EQ(sem_init(&sem, 0, 0), 0);
426 for (size_t n = 0; n < iters; ++n) {
427 EXPECT_EQ(sem_trywait(&sem), -1);
428 }
429 EXPECT_EQ(sem_destroy(&sem), 0);
430}
431
432static void contendedUse(uint32_t n, int posters, int waiters) {
433 LifoSemImpl<std::atomic> sem;
434
435 std::vector<std::thread> threads;
436 std::atomic<bool> go(false);
437
438 BENCHMARK_SUSPEND {
439 for (int t = 0; t < waiters; ++t) {
440 threads.emplace_back([=, &sem] {
441 for (uint32_t i = t; i < n; i += waiters) {
442 sem.wait();
443 }
444 });
445 }
446 for (int t = 0; t < posters; ++t) {
447 threads.emplace_back([=, &sem, &go] {
448 while (!go.load()) {
449 std::this_thread::yield();
450 }
451 for (uint32_t i = t; i < n; i += posters) {
452 sem.post();
453 }
454 });
455 }
456 }
457
458 go.store(true);
459 for (auto& thr : threads) {
460 thr.join();
461 }
462}
463
464BENCHMARK_DRAW_LINE();
465BENCHMARK_NAMED_PARAM(contendedUse, 1_to_1, 1, 1)
466BENCHMARK_NAMED_PARAM(contendedUse, 1_to_4, 1, 4)
467BENCHMARK_NAMED_PARAM(contendedUse, 1_to_32, 1, 32)
468BENCHMARK_NAMED_PARAM(contendedUse, 4_to_1, 4, 1)
469BENCHMARK_NAMED_PARAM(contendedUse, 4_to_24, 4, 24)
470BENCHMARK_NAMED_PARAM(contendedUse, 8_to_100, 8, 100)
471BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1, 31, 1)
472BENCHMARK_NAMED_PARAM(contendedUse, 16_to_16, 16, 16)
473BENCHMARK_NAMED_PARAM(contendedUse, 32_to_32, 32, 32)
474BENCHMARK_NAMED_PARAM(contendedUse, 32_to_1000, 32, 1000)
475
476// sudo nice -n -20 _build/opt/folly/test/LifoSemTests
477// --benchmark --bm_min_iters=10000000 --gtest_filter=-\*
478// ============================================================================
479// folly/test/LifoSemTests.cpp relative time/iter iters/s
480// ============================================================================
481// lifo_sem_pingpong 1.31us 762.40K
482// lifo_sem_oneway 193.89ns 5.16M
483// single_thread_lifo_post 15.37ns 65.08M
484// single_thread_lifo_wait 13.60ns 73.53M
485// single_thread_lifo_postwait 29.43ns 33.98M
486// single_thread_lifo_trywait 677.69ps 1.48G
487// single_thread_posix_postwait 25.03ns 39.95M
488// single_thread_posix_trywait 7.30ns 136.98M
489// ----------------------------------------------------------------------------
490// contendedUse(1_to_1) 158.22ns 6.32M
491// contendedUse(1_to_4) 574.73ns 1.74M
492// contendedUse(1_to_32) 592.94ns 1.69M
493// contendedUse(4_to_1) 118.28ns 8.45M
494// contendedUse(4_to_24) 667.62ns 1.50M
495// contendedUse(8_to_100) 701.46ns 1.43M
496// contendedUse(32_to_1) 165.06ns 6.06M
497// contendedUse(16_to_16) 238.57ns 4.19M
498// contendedUse(32_to_32) 219.82ns 4.55M
499// contendedUse(32_to_1000) 777.42ns 1.29M
500// ============================================================================
501
502int main(int argc, char** argv) {
503 testing::InitGoogleTest(&argc, argv);
504 gflags::ParseCommandLineFlags(&argc, &argv, true);
505 int rv = RUN_ALL_TESTS();
506 folly::runBenchmarksOnFlag();
507 return rv;
508}
509