1/*
2 * Copyright 2011-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/ThreadCachedInt.h>
18
19#include <atomic>
20#include <condition_variable>
21#include <memory>
22#include <thread>
23
24#include <glog/logging.h>
25
26#include <folly/Benchmark.h>
27#include <folly/hash/Hash.h>
28#include <folly/portability/GFlags.h>
29#include <folly/portability/GTest.h>
30#include <folly/system/ThreadId.h>
31
32using namespace folly;
33
34using std::unique_ptr;
35using std::vector;
36
37using Counter = ThreadCachedInt<int64_t>;
38
39class ThreadCachedIntTest : public testing::Test {
40 public:
41 uint32_t GetDeadThreadsTotal(const Counter& counter) {
42 return counter.readFast();
43 }
44};
45
46// Multithreaded tests. Creates a specified number of threads each of
47// which iterates a different amount and dies.
48
49namespace {
50// Set cacheSize to be large so cached data moves to target_ only when
51// thread dies.
52Counter g_counter_for_mt_slow(0, UINT32_MAX);
53Counter g_counter_for_mt_fast(0, UINT32_MAX);
54
55// Used to sync between threads. The value of this variable is the
56// maximum iteration index upto which Runner() is allowed to go.
57uint32_t g_sync_for_mt(0);
58std::condition_variable cv;
59std::mutex cv_m;
60
61// Performs the specified number of iterations. Within each
62// iteration, it increments counter 10 times. At the beginning of
63// each iteration it checks g_sync_for_mt to see if it can proceed,
64// otherwise goes into a loop sleeping and rechecking.
65void Runner(Counter* counter, uint32_t iterations) {
66 for (uint32_t i = 0; i < iterations; ++i) {
67 std::unique_lock<std::mutex> lk(cv_m);
68 cv.wait(lk, [i] { return i < g_sync_for_mt; });
69 for (uint32_t j = 0; j < 10; ++j) {
70 counter->increment(1);
71 }
72 }
73}
74} // namespace
75
76// Slow test with fewer threads where there are more busy waits and
77// many calls to readFull(). This attempts to test as many of the
78// code paths in Counter as possible to ensure that counter values are
79// properly passed from thread local state, both at calls to
80// readFull() and at thread death.
81TEST_F(ThreadCachedIntTest, MultithreadedSlow) {
82 static constexpr uint32_t kNumThreads = 20;
83 g_sync_for_mt = 0;
84 vector<unique_ptr<std::thread>> threads(kNumThreads);
85 // Creates kNumThreads threads. Each thread performs a different
86 // number of iterations in Runner() - threads[0] performs 1
87 // iteration, threads[1] performs 2 iterations, threads[2] performs
88 // 3 iterations, and so on.
89 for (uint32_t i = 0; i < kNumThreads; ++i) {
90 threads[i] =
91 std::make_unique<std::thread>(Runner, &g_counter_for_mt_slow, i + 1);
92 }
93 // Variable to grab current counter value.
94 int32_t counter_value;
95 // The expected value of the counter.
96 int32_t total = 0;
97 // The expected value of GetDeadThreadsTotal().
98 int32_t dead_total = 0;
99 // Each iteration of the following thread allows one additional
100 // iteration of the threads. Given that the threads perform
101 // different number of iterations from 1 through kNumThreads, one
102 // thread will complete in each of the iterations of the loop below.
103 for (uint32_t i = 0; i < kNumThreads; ++i) {
104 // Allow upto iteration i on all threads.
105 {
106 std::lock_guard<std::mutex> lk(cv_m);
107 g_sync_for_mt = i + 1;
108 }
109 cv.notify_all();
110 total += (kNumThreads - i) * 10;
111 // Loop until the counter reaches its expected value.
112 do {
113 counter_value = g_counter_for_mt_slow.readFull();
114 } while (counter_value < total);
115 // All threads have done what they can until iteration i, now make
116 // sure they don't go further by checking 10 more times in the
117 // following loop.
118 for (uint32_t j = 0; j < 10; ++j) {
119 counter_value = g_counter_for_mt_slow.readFull();
120 EXPECT_EQ(total, counter_value);
121 }
122 dead_total += (i + 1) * 10;
123 EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
124 }
125 // All threads are done.
126 for (uint32_t i = 0; i < kNumThreads; ++i) {
127 threads[i]->join();
128 }
129 counter_value = g_counter_for_mt_slow.readFull();
130 EXPECT_EQ(total, counter_value);
131 EXPECT_EQ(total, dead_total);
132 EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
133}
134
135// Fast test with lots of threads and only one call to readFull()
136// at the end.
137TEST_F(ThreadCachedIntTest, MultithreadedFast) {
138 static constexpr uint32_t kNumThreads = 1000;
139 g_sync_for_mt = 0;
140 vector<unique_ptr<std::thread>> threads(kNumThreads);
141 // Creates kNumThreads threads. Each thread performs a different
142 // number of iterations in Runner() - threads[0] performs 1
143 // iteration, threads[1] performs 2 iterations, threads[2] performs
144 // 3 iterations, and so on.
145 for (uint32_t i = 0; i < kNumThreads; ++i) {
146 threads[i] =
147 std::make_unique<std::thread>(Runner, &g_counter_for_mt_fast, i + 1);
148 }
149 // Let the threads run to completion.
150 {
151 std::lock_guard<std::mutex> lk(cv_m);
152 g_sync_for_mt = kNumThreads;
153 }
154 cv.notify_all();
155 // The expected value of the counter.
156 uint32_t total = 0;
157 for (uint32_t i = 0; i < kNumThreads; ++i) {
158 total += (kNumThreads - i) * 10;
159 }
160 // Wait for all threads to complete.
161 for (uint32_t i = 0; i < kNumThreads; ++i) {
162 threads[i]->join();
163 }
164 int32_t counter_value = g_counter_for_mt_fast.readFull();
165 EXPECT_EQ(total, counter_value);
166 EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast));
167}
168
169TEST(ThreadCachedInt, SingleThreadedNotCached) {
170 ThreadCachedInt<int64_t> val(0, 0);
171 EXPECT_EQ(0, val.readFast());
172 ++val;
173 EXPECT_EQ(1, val.readFast());
174 for (int i = 0; i < 41; ++i) {
175 val.increment(1);
176 }
177 EXPECT_EQ(42, val.readFast());
178 --val;
179 EXPECT_EQ(41, val.readFast());
180}
181
182// Note: This is somewhat fragile to the implementation. If this causes
183// problems, feel free to remove it.
184TEST(ThreadCachedInt, SingleThreadedCached) {
185 ThreadCachedInt<int64_t> val(0, 10);
186 EXPECT_EQ(0, val.readFast());
187 ++val;
188 EXPECT_EQ(0, val.readFast());
189 for (int i = 0; i < 7; ++i) {
190 val.increment(1);
191 }
192 EXPECT_EQ(0, val.readFast());
193 EXPECT_EQ(0, val.readFastAndReset());
194 EXPECT_EQ(8, val.readFull());
195 EXPECT_EQ(8, val.readFullAndReset());
196 EXPECT_EQ(0, val.readFull());
197 EXPECT_EQ(0, val.readFast());
198}
199
200ThreadCachedInt<int32_t> globalInt32(0, 11);
201ThreadCachedInt<int64_t> globalInt64(0, 11);
202int kNumInserts = 100000;
203DEFINE_int32(numThreads, 8, "Number simultaneous threads for benchmarks.");
204#define CREATE_INC_FUNC(size) \
205 void incFunc##size() { \
206 const int num = kNumInserts / FLAGS_numThreads; \
207 for (int i = 0; i < num; ++i) { \
208 ++globalInt##size; \
209 } \
210 }
211CREATE_INC_FUNC(64)
212CREATE_INC_FUNC(32)
213
214// Confirms counts are accurate with competing threads
215TEST(ThreadCachedInt, MultiThreadedCached) {
216 kNumInserts = 100000;
217 CHECK_EQ(0, kNumInserts % FLAGS_numThreads)
218 << "FLAGS_numThreads must evenly divide kNumInserts (" << kNumInserts
219 << ").";
220 const int numPerThread = kNumInserts / FLAGS_numThreads;
221 ThreadCachedInt<int64_t> TCInt64(0, numPerThread - 2);
222 {
223 std::atomic<bool> run(true);
224 std::atomic<int> threadsDone(0);
225 std::vector<std::thread> threads;
226 for (int i = 0; i < FLAGS_numThreads; ++i) {
227 threads.push_back(std::thread([&] {
228 FOR_EACH_RANGE (k, 0, numPerThread) { ++TCInt64; }
229 std::atomic_fetch_add(&threadsDone, 1);
230 while (run.load()) {
231 usleep(100);
232 }
233 }));
234 }
235
236 // We create and increment another ThreadCachedInt here to make sure it
237 // doesn't interact with the other instances
238 ThreadCachedInt<int64_t> otherTCInt64(0, 10);
239 otherTCInt64.set(33);
240 ++otherTCInt64;
241
242 while (threadsDone.load() < FLAGS_numThreads) {
243 usleep(100);
244 }
245
246 ++otherTCInt64;
247
248 // Threads are done incrementing, but caches have not been flushed yet, so
249 // we have to readFull.
250 EXPECT_NE(kNumInserts, TCInt64.readFast());
251 EXPECT_EQ(kNumInserts, TCInt64.readFull());
252
253 run.store(false);
254 for (auto& t : threads) {
255 t.join();
256 }
257
258 } // Caches are flushed when threads finish
259 EXPECT_EQ(kNumInserts, TCInt64.readFast());
260}
261
262#define MAKE_MT_CACHE_SIZE_BM(size) \
263 void BM_mt_cache_size##size(int iters, int cacheSize) { \
264 kNumInserts = iters; \
265 globalInt##size.set(0); \
266 globalInt##size.setCacheSize(cacheSize); \
267 std::vector<std::thread> threads; \
268 for (int i = 0; i < FLAGS_numThreads; ++i) { \
269 threads.push_back(std::thread(incFunc##size)); \
270 } \
271 for (auto& t : threads) { \
272 t.join(); \
273 } \
274 }
275MAKE_MT_CACHE_SIZE_BM(64)
276MAKE_MT_CACHE_SIZE_BM(32)
277
278#define REG_BASELINE(name, inc_stmt) \
279 BENCHMARK(FB_CONCATENATE(BM_mt_baseline_, name), iters) { \
280 const int iterPerThread = iters / FLAGS_numThreads; \
281 std::vector<std::thread> threads; \
282 for (int i = 0; i < FLAGS_numThreads; ++i) { \
283 threads.push_back(std::thread([&]() { \
284 for (int j = 0; j < iterPerThread; ++j) { \
285 inc_stmt; \
286 } \
287 })); \
288 } \
289 for (auto& t : threads) { \
290 t.join(); \
291 } \
292 }
293
294ThreadLocal<int64_t> globalTL64Baseline;
295ThreadLocal<int32_t> globalTL32Baseline;
296std::atomic<int64_t> globalInt64Baseline(0);
297std::atomic<int32_t> globalInt32Baseline(0);
298FOLLY_TLS int64_t global__thread64;
299FOLLY_TLS int32_t global__thread32;
300
301// Alternate lock-free implementation. Achieves about the same performance,
302// but uses about 20x more memory than ThreadCachedInt with 24 threads.
303struct ShardedAtomicInt {
304 static const int64_t kBuckets_ = 2048;
305 std::atomic<int64_t> ints_[kBuckets_];
306
307 inline void inc(int64_t val = 1) {
308 int buck = hash::twang_mix64(folly::getCurrentThreadID()) & (kBuckets_ - 1);
309 std::atomic_fetch_add(&ints_[buck], val);
310 }
311
312 // read the first few and extrapolate
313 int64_t readFast() {
314 int64_t ret = 0;
315 static const int numToRead = 8;
316 FOR_EACH_RANGE (i, 0, numToRead) {
317 ret += ints_[i].load(std::memory_order_relaxed);
318 }
319 return ret * (kBuckets_ / numToRead);
320 }
321
322 // readFull is lock-free, but has to do thousands of loads...
323 int64_t readFull() {
324 int64_t ret = 0;
325 for (auto& i : ints_) {
326 // Fun fact - using memory_order_consume below reduces perf 30-40% in high
327 // contention benchmarks.
328 ret += i.load(std::memory_order_relaxed);
329 }
330 return ret;
331 }
332};
333ShardedAtomicInt shd_int64;
334
335REG_BASELINE(_thread64, global__thread64 += 1)
336REG_BASELINE(_thread32, global__thread32 += 1)
337REG_BASELINE(ThreadLocal64, *globalTL64Baseline += 1)
338REG_BASELINE(ThreadLocal32, *globalTL32Baseline += 1)
339REG_BASELINE(
340 atomic_inc64,
341 std::atomic_fetch_add(&globalInt64Baseline, int64_t(1)))
342REG_BASELINE(
343 atomic_inc32,
344 std::atomic_fetch_add(&globalInt32Baseline, int32_t(1)))
345REG_BASELINE(ShardedAtm64, shd_int64.inc())
346
347BENCHMARK_PARAM(BM_mt_cache_size64, 0)
348BENCHMARK_PARAM(BM_mt_cache_size64, 10)
349BENCHMARK_PARAM(BM_mt_cache_size64, 100)
350BENCHMARK_PARAM(BM_mt_cache_size64, 1000)
351BENCHMARK_PARAM(BM_mt_cache_size32, 0)
352BENCHMARK_PARAM(BM_mt_cache_size32, 10)
353BENCHMARK_PARAM(BM_mt_cache_size32, 100)
354BENCHMARK_PARAM(BM_mt_cache_size32, 1000)
355BENCHMARK_DRAW_LINE();
356
357// single threaded
358BENCHMARK(Atomic_readFull) {
359 doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed));
360}
361BENCHMARK(ThrCache_readFull) {
362 doNotOptimizeAway(globalInt64.readFull());
363}
364BENCHMARK(Sharded_readFull) {
365 doNotOptimizeAway(shd_int64.readFull());
366}
367BENCHMARK(ThrCache_readFast) {
368 doNotOptimizeAway(globalInt64.readFast());
369}
370BENCHMARK(Sharded_readFast) {
371 doNotOptimizeAway(shd_int64.readFast());
372}
373BENCHMARK_DRAW_LINE();
374
375// multi threaded
376REG_BASELINE(
377 Atomic_readFull,
378 doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed)))
379REG_BASELINE(ThrCache_readFull, doNotOptimizeAway(globalInt64.readFull()))
380REG_BASELINE(Sharded_readFull, doNotOptimizeAway(shd_int64.readFull()))
381REG_BASELINE(ThrCache_readFast, doNotOptimizeAway(globalInt64.readFast()))
382REG_BASELINE(Sharded_readFast, doNotOptimizeAway(shd_int64.readFast()))
383BENCHMARK_DRAW_LINE();
384
385int main(int argc, char** argv) {
386 testing::InitGoogleTest(&argc, argv);
387 gflags::ParseCommandLineFlags(&argc, &argv, true);
388 gflags::SetCommandLineOptionWithMode(
389 "bm_min_usec", "10000", gflags::SET_FLAG_IF_DEFAULT);
390 if (FLAGS_benchmark) {
391 folly::runBenchmarks();
392 }
393 return RUN_ALL_TESTS();
394}
395
396/*
397 Ran with 20 threads on dual 12-core Xeon(R) X5650 @ 2.67GHz with 12-MB caches
398
399 Benchmark Iters Total t t/iter iter/sec
400 ------------------------------------------------------------------------------
401 + 103% BM_mt_baseline__thread64 10000000 13.54 ms 1.354 ns 704.4 M
402* BM_mt_baseline__thread32 10000000 6.651 ms 665.1 ps 1.4 G
403 +50.3% BM_mt_baseline_ThreadLocal64 10000000 9.994 ms 999.4 ps 954.2 M
404 +49.9% BM_mt_baseline_ThreadLocal32 10000000 9.972 ms 997.2 ps 956.4 M
405 +2650% BM_mt_baseline_atomic_inc64 10000000 182.9 ms 18.29 ns 52.13 M
406 +2665% BM_mt_baseline_atomic_inc32 10000000 183.9 ms 18.39 ns 51.85 M
407 +75.3% BM_mt_baseline_ShardedAtm64 10000000 11.66 ms 1.166 ns 817.8 M
408 +6670% BM_mt_cache_size64/0 10000000 450.3 ms 45.03 ns 21.18 M
409 +1644% BM_mt_cache_size64/10 10000000 116 ms 11.6 ns 82.2 M
410 + 381% BM_mt_cache_size64/100 10000000 32.04 ms 3.204 ns 297.7 M
411 + 129% BM_mt_cache_size64/1000 10000000 15.24 ms 1.524 ns 625.8 M
412 +6052% BM_mt_cache_size32/0 10000000 409.2 ms 40.92 ns 23.31 M
413 +1304% BM_mt_cache_size32/10 10000000 93.39 ms 9.339 ns 102.1 M
414 + 298% BM_mt_cache_size32/100 10000000 26.52 ms 2.651 ns 359.7 M
415 +68.1% BM_mt_cache_size32/1000 10000000 11.18 ms 1.118 ns 852.9 M
416------------------------------------------------------------------------------
417 +10.4% Atomic_readFull 10000000 36.05 ms 3.605 ns 264.5 M
418 + 619% ThrCache_readFull 10000000 235.1 ms 23.51 ns 40.57 M
419 SLOW Sharded_readFull 1981093 2 s 1.01 us 967.3 k
420* ThrCache_readFast 10000000 32.65 ms 3.265 ns 292.1 M
421 +10.0% Sharded_readFast 10000000 35.92 ms 3.592 ns 265.5 M
422------------------------------------------------------------------------------
423 +4.54% BM_mt_baseline_Atomic_readFull 10000000 8.672 ms 867.2 ps 1.074 G
424 SLOW BM_mt_baseline_ThrCache_readFull 10000000 996.9 ms 99.69 ns 9.567 M
425 SLOW BM_mt_baseline_Sharded_readFull 10000000 891.5 ms 89.15 ns 10.7 M
426* BM_mt_baseline_ThrCache_readFast 10000000 8.295 ms 829.5 ps 1.123 G
427 +12.7% BM_mt_baseline_Sharded_readFast 10000000 9.348 ms 934.8 ps 1020 M
428------------------------------------------------------------------------------
429*/
430