1 | /* |
2 | * Copyright 2011-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/ThreadCachedInt.h> |
18 | |
19 | #include <atomic> |
20 | #include <condition_variable> |
21 | #include <memory> |
22 | #include <thread> |
23 | |
24 | #include <glog/logging.h> |
25 | |
26 | #include <folly/Benchmark.h> |
27 | #include <folly/hash/Hash.h> |
28 | #include <folly/portability/GFlags.h> |
29 | #include <folly/portability/GTest.h> |
30 | #include <folly/system/ThreadId.h> |
31 | |
32 | using namespace folly; |
33 | |
34 | using std::unique_ptr; |
35 | using std::vector; |
36 | |
37 | using Counter = ThreadCachedInt<int64_t>; |
38 | |
39 | class ThreadCachedIntTest : public testing::Test { |
40 | public: |
41 | uint32_t GetDeadThreadsTotal(const Counter& counter) { |
42 | return counter.readFast(); |
43 | } |
44 | }; |
45 | |
46 | // Multithreaded tests. Creates a specified number of threads each of |
47 | // which iterates a different amount and dies. |
48 | |
49 | namespace { |
50 | // Set cacheSize to be large so cached data moves to target_ only when |
51 | // thread dies. |
52 | Counter g_counter_for_mt_slow(0, UINT32_MAX); |
53 | Counter g_counter_for_mt_fast(0, UINT32_MAX); |
54 | |
55 | // Used to sync between threads. The value of this variable is the |
56 | // maximum iteration index upto which Runner() is allowed to go. |
57 | uint32_t g_sync_for_mt(0); |
58 | std::condition_variable cv; |
59 | std::mutex cv_m; |
60 | |
61 | // Performs the specified number of iterations. Within each |
62 | // iteration, it increments counter 10 times. At the beginning of |
63 | // each iteration it checks g_sync_for_mt to see if it can proceed, |
64 | // otherwise goes into a loop sleeping and rechecking. |
65 | void Runner(Counter* counter, uint32_t iterations) { |
66 | for (uint32_t i = 0; i < iterations; ++i) { |
67 | std::unique_lock<std::mutex> lk(cv_m); |
68 | cv.wait(lk, [i] { return i < g_sync_for_mt; }); |
69 | for (uint32_t j = 0; j < 10; ++j) { |
70 | counter->increment(1); |
71 | } |
72 | } |
73 | } |
74 | } // namespace |
75 | |
76 | // Slow test with fewer threads where there are more busy waits and |
77 | // many calls to readFull(). This attempts to test as many of the |
78 | // code paths in Counter as possible to ensure that counter values are |
79 | // properly passed from thread local state, both at calls to |
80 | // readFull() and at thread death. |
81 | TEST_F(ThreadCachedIntTest, MultithreadedSlow) { |
82 | static constexpr uint32_t kNumThreads = 20; |
83 | g_sync_for_mt = 0; |
84 | vector<unique_ptr<std::thread>> threads(kNumThreads); |
85 | // Creates kNumThreads threads. Each thread performs a different |
86 | // number of iterations in Runner() - threads[0] performs 1 |
87 | // iteration, threads[1] performs 2 iterations, threads[2] performs |
88 | // 3 iterations, and so on. |
89 | for (uint32_t i = 0; i < kNumThreads; ++i) { |
90 | threads[i] = |
91 | std::make_unique<std::thread>(Runner, &g_counter_for_mt_slow, i + 1); |
92 | } |
93 | // Variable to grab current counter value. |
94 | int32_t counter_value; |
95 | // The expected value of the counter. |
96 | int32_t total = 0; |
97 | // The expected value of GetDeadThreadsTotal(). |
98 | int32_t dead_total = 0; |
99 | // Each iteration of the following thread allows one additional |
100 | // iteration of the threads. Given that the threads perform |
101 | // different number of iterations from 1 through kNumThreads, one |
102 | // thread will complete in each of the iterations of the loop below. |
103 | for (uint32_t i = 0; i < kNumThreads; ++i) { |
104 | // Allow upto iteration i on all threads. |
105 | { |
106 | std::lock_guard<std::mutex> lk(cv_m); |
107 | g_sync_for_mt = i + 1; |
108 | } |
109 | cv.notify_all(); |
110 | total += (kNumThreads - i) * 10; |
111 | // Loop until the counter reaches its expected value. |
112 | do { |
113 | counter_value = g_counter_for_mt_slow.readFull(); |
114 | } while (counter_value < total); |
115 | // All threads have done what they can until iteration i, now make |
116 | // sure they don't go further by checking 10 more times in the |
117 | // following loop. |
118 | for (uint32_t j = 0; j < 10; ++j) { |
119 | counter_value = g_counter_for_mt_slow.readFull(); |
120 | EXPECT_EQ(total, counter_value); |
121 | } |
122 | dead_total += (i + 1) * 10; |
123 | EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow)); |
124 | } |
125 | // All threads are done. |
126 | for (uint32_t i = 0; i < kNumThreads; ++i) { |
127 | threads[i]->join(); |
128 | } |
129 | counter_value = g_counter_for_mt_slow.readFull(); |
130 | EXPECT_EQ(total, counter_value); |
131 | EXPECT_EQ(total, dead_total); |
132 | EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow)); |
133 | } |
134 | |
135 | // Fast test with lots of threads and only one call to readFull() |
136 | // at the end. |
137 | TEST_F(ThreadCachedIntTest, MultithreadedFast) { |
138 | static constexpr uint32_t kNumThreads = 1000; |
139 | g_sync_for_mt = 0; |
140 | vector<unique_ptr<std::thread>> threads(kNumThreads); |
141 | // Creates kNumThreads threads. Each thread performs a different |
142 | // number of iterations in Runner() - threads[0] performs 1 |
143 | // iteration, threads[1] performs 2 iterations, threads[2] performs |
144 | // 3 iterations, and so on. |
145 | for (uint32_t i = 0; i < kNumThreads; ++i) { |
146 | threads[i] = |
147 | std::make_unique<std::thread>(Runner, &g_counter_for_mt_fast, i + 1); |
148 | } |
149 | // Let the threads run to completion. |
150 | { |
151 | std::lock_guard<std::mutex> lk(cv_m); |
152 | g_sync_for_mt = kNumThreads; |
153 | } |
154 | cv.notify_all(); |
155 | // The expected value of the counter. |
156 | uint32_t total = 0; |
157 | for (uint32_t i = 0; i < kNumThreads; ++i) { |
158 | total += (kNumThreads - i) * 10; |
159 | } |
160 | // Wait for all threads to complete. |
161 | for (uint32_t i = 0; i < kNumThreads; ++i) { |
162 | threads[i]->join(); |
163 | } |
164 | int32_t counter_value = g_counter_for_mt_fast.readFull(); |
165 | EXPECT_EQ(total, counter_value); |
166 | EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast)); |
167 | } |
168 | |
169 | TEST(ThreadCachedInt, SingleThreadedNotCached) { |
170 | ThreadCachedInt<int64_t> val(0, 0); |
171 | EXPECT_EQ(0, val.readFast()); |
172 | ++val; |
173 | EXPECT_EQ(1, val.readFast()); |
174 | for (int i = 0; i < 41; ++i) { |
175 | val.increment(1); |
176 | } |
177 | EXPECT_EQ(42, val.readFast()); |
178 | --val; |
179 | EXPECT_EQ(41, val.readFast()); |
180 | } |
181 | |
182 | // Note: This is somewhat fragile to the implementation. If this causes |
183 | // problems, feel free to remove it. |
184 | TEST(ThreadCachedInt, SingleThreadedCached) { |
185 | ThreadCachedInt<int64_t> val(0, 10); |
186 | EXPECT_EQ(0, val.readFast()); |
187 | ++val; |
188 | EXPECT_EQ(0, val.readFast()); |
189 | for (int i = 0; i < 7; ++i) { |
190 | val.increment(1); |
191 | } |
192 | EXPECT_EQ(0, val.readFast()); |
193 | EXPECT_EQ(0, val.readFastAndReset()); |
194 | EXPECT_EQ(8, val.readFull()); |
195 | EXPECT_EQ(8, val.readFullAndReset()); |
196 | EXPECT_EQ(0, val.readFull()); |
197 | EXPECT_EQ(0, val.readFast()); |
198 | } |
199 | |
200 | ThreadCachedInt<int32_t> globalInt32(0, 11); |
201 | ThreadCachedInt<int64_t> globalInt64(0, 11); |
202 | int kNumInserts = 100000; |
203 | DEFINE_int32(numThreads, 8, "Number simultaneous threads for benchmarks." ); |
204 | #define CREATE_INC_FUNC(size) \ |
205 | void incFunc##size() { \ |
206 | const int num = kNumInserts / FLAGS_numThreads; \ |
207 | for (int i = 0; i < num; ++i) { \ |
208 | ++globalInt##size; \ |
209 | } \ |
210 | } |
211 | CREATE_INC_FUNC(64) |
212 | CREATE_INC_FUNC(32) |
213 | |
214 | // Confirms counts are accurate with competing threads |
215 | TEST(ThreadCachedInt, MultiThreadedCached) { |
216 | kNumInserts = 100000; |
217 | CHECK_EQ(0, kNumInserts % FLAGS_numThreads) |
218 | << "FLAGS_numThreads must evenly divide kNumInserts (" << kNumInserts |
219 | << ")." ; |
220 | const int numPerThread = kNumInserts / FLAGS_numThreads; |
221 | ThreadCachedInt<int64_t> TCInt64(0, numPerThread - 2); |
222 | { |
223 | std::atomic<bool> run(true); |
224 | std::atomic<int> threadsDone(0); |
225 | std::vector<std::thread> threads; |
226 | for (int i = 0; i < FLAGS_numThreads; ++i) { |
227 | threads.push_back(std::thread([&] { |
228 | FOR_EACH_RANGE (k, 0, numPerThread) { ++TCInt64; } |
229 | std::atomic_fetch_add(&threadsDone, 1); |
230 | while (run.load()) { |
231 | usleep(100); |
232 | } |
233 | })); |
234 | } |
235 | |
236 | // We create and increment another ThreadCachedInt here to make sure it |
237 | // doesn't interact with the other instances |
238 | ThreadCachedInt<int64_t> otherTCInt64(0, 10); |
239 | otherTCInt64.set(33); |
240 | ++otherTCInt64; |
241 | |
242 | while (threadsDone.load() < FLAGS_numThreads) { |
243 | usleep(100); |
244 | } |
245 | |
246 | ++otherTCInt64; |
247 | |
248 | // Threads are done incrementing, but caches have not been flushed yet, so |
249 | // we have to readFull. |
250 | EXPECT_NE(kNumInserts, TCInt64.readFast()); |
251 | EXPECT_EQ(kNumInserts, TCInt64.readFull()); |
252 | |
253 | run.store(false); |
254 | for (auto& t : threads) { |
255 | t.join(); |
256 | } |
257 | |
258 | } // Caches are flushed when threads finish |
259 | EXPECT_EQ(kNumInserts, TCInt64.readFast()); |
260 | } |
261 | |
262 | #define MAKE_MT_CACHE_SIZE_BM(size) \ |
263 | void BM_mt_cache_size##size(int iters, int cacheSize) { \ |
264 | kNumInserts = iters; \ |
265 | globalInt##size.set(0); \ |
266 | globalInt##size.setCacheSize(cacheSize); \ |
267 | std::vector<std::thread> threads; \ |
268 | for (int i = 0; i < FLAGS_numThreads; ++i) { \ |
269 | threads.push_back(std::thread(incFunc##size)); \ |
270 | } \ |
271 | for (auto& t : threads) { \ |
272 | t.join(); \ |
273 | } \ |
274 | } |
275 | MAKE_MT_CACHE_SIZE_BM(64) |
276 | MAKE_MT_CACHE_SIZE_BM(32) |
277 | |
278 | #define REG_BASELINE(name, inc_stmt) \ |
279 | BENCHMARK(FB_CONCATENATE(BM_mt_baseline_, name), iters) { \ |
280 | const int iterPerThread = iters / FLAGS_numThreads; \ |
281 | std::vector<std::thread> threads; \ |
282 | for (int i = 0; i < FLAGS_numThreads; ++i) { \ |
283 | threads.push_back(std::thread([&]() { \ |
284 | for (int j = 0; j < iterPerThread; ++j) { \ |
285 | inc_stmt; \ |
286 | } \ |
287 | })); \ |
288 | } \ |
289 | for (auto& t : threads) { \ |
290 | t.join(); \ |
291 | } \ |
292 | } |
293 | |
294 | ThreadLocal<int64_t> globalTL64Baseline; |
295 | ThreadLocal<int32_t> globalTL32Baseline; |
296 | std::atomic<int64_t> globalInt64Baseline(0); |
297 | std::atomic<int32_t> globalInt32Baseline(0); |
298 | FOLLY_TLS int64_t global__thread64; |
299 | FOLLY_TLS int32_t global__thread32; |
300 | |
301 | // Alternate lock-free implementation. Achieves about the same performance, |
302 | // but uses about 20x more memory than ThreadCachedInt with 24 threads. |
303 | struct ShardedAtomicInt { |
304 | static const int64_t kBuckets_ = 2048; |
305 | std::atomic<int64_t> ints_[kBuckets_]; |
306 | |
307 | inline void inc(int64_t val = 1) { |
308 | int buck = hash::twang_mix64(folly::getCurrentThreadID()) & (kBuckets_ - 1); |
309 | std::atomic_fetch_add(&ints_[buck], val); |
310 | } |
311 | |
312 | // read the first few and extrapolate |
313 | int64_t readFast() { |
314 | int64_t ret = 0; |
315 | static const int numToRead = 8; |
316 | FOR_EACH_RANGE (i, 0, numToRead) { |
317 | ret += ints_[i].load(std::memory_order_relaxed); |
318 | } |
319 | return ret * (kBuckets_ / numToRead); |
320 | } |
321 | |
322 | // readFull is lock-free, but has to do thousands of loads... |
323 | int64_t readFull() { |
324 | int64_t ret = 0; |
325 | for (auto& i : ints_) { |
326 | // Fun fact - using memory_order_consume below reduces perf 30-40% in high |
327 | // contention benchmarks. |
328 | ret += i.load(std::memory_order_relaxed); |
329 | } |
330 | return ret; |
331 | } |
332 | }; |
333 | ShardedAtomicInt shd_int64; |
334 | |
335 | REG_BASELINE(_thread64, global__thread64 += 1) |
336 | REG_BASELINE(_thread32, global__thread32 += 1) |
337 | REG_BASELINE(ThreadLocal64, *globalTL64Baseline += 1) |
338 | REG_BASELINE(ThreadLocal32, *globalTL32Baseline += 1) |
339 | REG_BASELINE( |
340 | atomic_inc64, |
341 | std::atomic_fetch_add(&globalInt64Baseline, int64_t(1))) |
342 | REG_BASELINE( |
343 | atomic_inc32, |
344 | std::atomic_fetch_add(&globalInt32Baseline, int32_t(1))) |
345 | REG_BASELINE(ShardedAtm64, shd_int64.inc()) |
346 | |
347 | BENCHMARK_PARAM(BM_mt_cache_size64, 0) |
348 | BENCHMARK_PARAM(BM_mt_cache_size64, 10) |
349 | BENCHMARK_PARAM(BM_mt_cache_size64, 100) |
350 | BENCHMARK_PARAM(BM_mt_cache_size64, 1000) |
351 | BENCHMARK_PARAM(BM_mt_cache_size32, 0) |
352 | BENCHMARK_PARAM(BM_mt_cache_size32, 10) |
353 | BENCHMARK_PARAM(BM_mt_cache_size32, 100) |
354 | BENCHMARK_PARAM(BM_mt_cache_size32, 1000) |
355 | BENCHMARK_DRAW_LINE(); |
356 | |
357 | // single threaded |
358 | BENCHMARK(Atomic_readFull) { |
359 | doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed)); |
360 | } |
361 | BENCHMARK(ThrCache_readFull) { |
362 | doNotOptimizeAway(globalInt64.readFull()); |
363 | } |
364 | BENCHMARK(Sharded_readFull) { |
365 | doNotOptimizeAway(shd_int64.readFull()); |
366 | } |
367 | BENCHMARK(ThrCache_readFast) { |
368 | doNotOptimizeAway(globalInt64.readFast()); |
369 | } |
370 | BENCHMARK(Sharded_readFast) { |
371 | doNotOptimizeAway(shd_int64.readFast()); |
372 | } |
373 | BENCHMARK_DRAW_LINE(); |
374 | |
375 | // multi threaded |
376 | REG_BASELINE( |
377 | Atomic_readFull, |
378 | doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed))) |
379 | REG_BASELINE(ThrCache_readFull, doNotOptimizeAway(globalInt64.readFull())) |
380 | REG_BASELINE(Sharded_readFull, doNotOptimizeAway(shd_int64.readFull())) |
381 | REG_BASELINE(ThrCache_readFast, doNotOptimizeAway(globalInt64.readFast())) |
382 | REG_BASELINE(Sharded_readFast, doNotOptimizeAway(shd_int64.readFast())) |
383 | BENCHMARK_DRAW_LINE(); |
384 | |
385 | int main(int argc, char** argv) { |
386 | testing::InitGoogleTest(&argc, argv); |
387 | gflags::ParseCommandLineFlags(&argc, &argv, true); |
388 | gflags::SetCommandLineOptionWithMode( |
389 | "bm_min_usec" , "10000" , gflags::SET_FLAG_IF_DEFAULT); |
390 | if (FLAGS_benchmark) { |
391 | folly::runBenchmarks(); |
392 | } |
393 | return RUN_ALL_TESTS(); |
394 | } |
395 | |
396 | /* |
397 | Ran with 20 threads on dual 12-core Xeon(R) X5650 @ 2.67GHz with 12-MB caches |
398 | |
399 | Benchmark Iters Total t t/iter iter/sec |
400 | ------------------------------------------------------------------------------ |
401 | + 103% BM_mt_baseline__thread64 10000000 13.54 ms 1.354 ns 704.4 M |
402 | * BM_mt_baseline__thread32 10000000 6.651 ms 665.1 ps 1.4 G |
403 | +50.3% BM_mt_baseline_ThreadLocal64 10000000 9.994 ms 999.4 ps 954.2 M |
404 | +49.9% BM_mt_baseline_ThreadLocal32 10000000 9.972 ms 997.2 ps 956.4 M |
405 | +2650% BM_mt_baseline_atomic_inc64 10000000 182.9 ms 18.29 ns 52.13 M |
406 | +2665% BM_mt_baseline_atomic_inc32 10000000 183.9 ms 18.39 ns 51.85 M |
407 | +75.3% BM_mt_baseline_ShardedAtm64 10000000 11.66 ms 1.166 ns 817.8 M |
408 | +6670% BM_mt_cache_size64/0 10000000 450.3 ms 45.03 ns 21.18 M |
409 | +1644% BM_mt_cache_size64/10 10000000 116 ms 11.6 ns 82.2 M |
410 | + 381% BM_mt_cache_size64/100 10000000 32.04 ms 3.204 ns 297.7 M |
411 | + 129% BM_mt_cache_size64/1000 10000000 15.24 ms 1.524 ns 625.8 M |
412 | +6052% BM_mt_cache_size32/0 10000000 409.2 ms 40.92 ns 23.31 M |
413 | +1304% BM_mt_cache_size32/10 10000000 93.39 ms 9.339 ns 102.1 M |
414 | + 298% BM_mt_cache_size32/100 10000000 26.52 ms 2.651 ns 359.7 M |
415 | +68.1% BM_mt_cache_size32/1000 10000000 11.18 ms 1.118 ns 852.9 M |
416 | ------------------------------------------------------------------------------ |
417 | +10.4% Atomic_readFull 10000000 36.05 ms 3.605 ns 264.5 M |
418 | + 619% ThrCache_readFull 10000000 235.1 ms 23.51 ns 40.57 M |
419 | SLOW Sharded_readFull 1981093 2 s 1.01 us 967.3 k |
420 | * ThrCache_readFast 10000000 32.65 ms 3.265 ns 292.1 M |
421 | +10.0% Sharded_readFast 10000000 35.92 ms 3.592 ns 265.5 M |
422 | ------------------------------------------------------------------------------ |
423 | +4.54% BM_mt_baseline_Atomic_readFull 10000000 8.672 ms 867.2 ps 1.074 G |
424 | SLOW BM_mt_baseline_ThrCache_readFull 10000000 996.9 ms 99.69 ns 9.567 M |
425 | SLOW BM_mt_baseline_Sharded_readFull 10000000 891.5 ms 89.15 ns 10.7 M |
426 | * BM_mt_baseline_ThrCache_readFast 10000000 8.295 ms 829.5 ps 1.123 G |
427 | +12.7% BM_mt_baseline_Sharded_readFast 10000000 9.348 ms 934.8 ps 1020 M |
428 | ------------------------------------------------------------------------------ |
429 | */ |
430 | |