1 | #include <gtest/gtest.h> |
2 | |
3 | #include <Common/Exception.h> |
4 | #include <Common/RWLock.h> |
5 | #include <Common/Stopwatch.h> |
6 | #include <common/Types.h> |
7 | #include <Common/ThreadPool.h> |
8 | #include <random> |
9 | #include <pcg_random.hpp> |
10 | #include <thread> |
11 | #include <atomic> |
12 | #include <iomanip> |
13 | |
14 | |
15 | using namespace DB; |
16 | |
17 | namespace DB |
18 | { |
19 | namespace ErrorCodes |
20 | { |
21 | extern const int DEADLOCK_AVOIDED; |
22 | } |
23 | } |
24 | |
25 | |
26 | TEST(Common, RWLock_1) |
27 | { |
28 | constexpr int cycles = 1000; |
29 | const std::vector<size_t> pool_sizes{1, 2, 4, 8}; |
30 | |
31 | static std::atomic<int> readers{0}; |
32 | static std::atomic<int> writers{0}; |
33 | |
34 | static auto fifo_lock = RWLockImpl::create(); |
35 | |
36 | static thread_local std::random_device rd; |
37 | static thread_local pcg64 gen(rd()); |
38 | |
39 | auto func = [&] (size_t threads, int round) |
40 | { |
41 | for (int i = 0; i < cycles; ++i) |
42 | { |
43 | auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockImpl::Read : RWLockImpl::Write; |
44 | auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen)); |
45 | |
46 | auto lock = fifo_lock->getLock(type, RWLockImpl::NO_QUERY); |
47 | |
48 | if (type == RWLockImpl::Write) |
49 | { |
50 | ++writers; |
51 | |
52 | ASSERT_EQ(writers, 1); |
53 | ASSERT_EQ(readers, 0); |
54 | |
55 | std::this_thread::sleep_for(sleep_for); |
56 | |
57 | --writers; |
58 | } |
59 | else |
60 | { |
61 | ++readers; |
62 | |
63 | ASSERT_EQ(writers, 0); |
64 | ASSERT_GE(readers, 1); |
65 | ASSERT_LE(readers, threads); |
66 | |
67 | std::this_thread::sleep_for(sleep_for); |
68 | |
69 | --readers; |
70 | } |
71 | } |
72 | }; |
73 | |
74 | for (auto pool_size : pool_sizes) |
75 | { |
76 | for (int round = 0; round < 10; ++round) |
77 | { |
78 | Stopwatch watch(CLOCK_MONOTONIC_COARSE); |
79 | |
80 | std::list<std::thread> threads; |
81 | for (size_t thread = 0; thread < pool_size; ++thread) |
82 | threads.emplace_back([=] () { func(pool_size, round); }); |
83 | |
84 | for (auto & thread : threads) |
85 | thread.join(); |
86 | |
87 | auto total_time = watch.elapsedSeconds(); |
88 | std::cout << "Threads " << pool_size << ", round " << round << ", total_time " << std::setprecision(2) << total_time << "\n" ; |
89 | } |
90 | } |
91 | } |
92 | |
93 | TEST(Common, RWLock_Recursive) |
94 | { |
95 | constexpr auto cycles = 10000; |
96 | |
97 | static auto fifo_lock = RWLockImpl::create(); |
98 | |
99 | static thread_local std::random_device rd; |
100 | static thread_local pcg64 gen(rd()); |
101 | |
102 | std::thread t1([&] () |
103 | { |
104 | for (int i = 0; i < 2 * cycles; ++i) |
105 | { |
106 | auto lock = fifo_lock->getLock(RWLockImpl::Write, "q1" ); |
107 | |
108 | auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen)); |
109 | std::this_thread::sleep_for(sleep_for); |
110 | } |
111 | }); |
112 | |
113 | std::thread t2([&] () |
114 | { |
115 | for (int i = 0; i < cycles; ++i) |
116 | { |
117 | auto lock1 = fifo_lock->getLock(RWLockImpl::Read, "q2" ); |
118 | |
119 | auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen)); |
120 | std::this_thread::sleep_for(sleep_for); |
121 | |
122 | auto lock2 = fifo_lock->getLock(RWLockImpl::Read, "q2" ); |
123 | |
124 | EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, "q2" );}); |
125 | } |
126 | |
127 | fifo_lock->getLock(RWLockImpl::Write, "q2" ); |
128 | }); |
129 | |
130 | t1.join(); |
131 | t2.join(); |
132 | } |
133 | |
134 | |
135 | TEST(Common, RWLock_Deadlock) |
136 | { |
137 | static auto lock1 = RWLockImpl::create(); |
138 | static auto lock2 = RWLockImpl::create(); |
139 | |
140 | /** |
141 | * q1: r1 r2 |
142 | * q2: w1 |
143 | * q3: r2 r1 |
144 | * q4: w2 |
145 | */ |
146 | |
147 | std::thread t1([&] () |
148 | { |
149 | auto holder1 = lock1->getLock(RWLockImpl::Read, "q1" ); |
150 | usleep(100000); |
151 | usleep(100000); |
152 | usleep(100000); |
153 | try |
154 | { |
155 | auto holder2 = lock2->getLock(RWLockImpl::Read, "q1" ); |
156 | } |
157 | catch (const Exception & e) |
158 | { |
159 | if (e.code() != ErrorCodes::DEADLOCK_AVOIDED) |
160 | throw; |
161 | } |
162 | }); |
163 | |
164 | std::thread t2([&] () |
165 | { |
166 | usleep(100000); |
167 | auto holder1 = lock1->getLock(RWLockImpl::Write, "q2" ); |
168 | }); |
169 | |
170 | std::thread t3([&] () |
171 | { |
172 | usleep(100000); |
173 | usleep(100000); |
174 | auto holder2 = lock2->getLock(RWLockImpl::Read, "q3" ); |
175 | usleep(100000); |
176 | usleep(100000); |
177 | try |
178 | { |
179 | auto holder1 = lock1->getLock(RWLockImpl::Read, "q3" ); |
180 | } |
181 | catch (const Exception & e) |
182 | { |
183 | if (e.code() != ErrorCodes::DEADLOCK_AVOIDED) |
184 | throw; |
185 | } |
186 | }); |
187 | |
188 | std::thread t4([&] () |
189 | { |
190 | usleep(100000); |
191 | usleep(100000); |
192 | usleep(100000); |
193 | auto holder2 = lock2->getLock(RWLockImpl::Write, "q4" ); |
194 | }); |
195 | |
196 | t1.join(); |
197 | t2.join(); |
198 | t3.join(); |
199 | t4.join(); |
200 | } |
201 | |
202 | |
203 | TEST(Common, RWLock_PerfTest_Readers) |
204 | { |
205 | constexpr int cycles = 100000; // 100k |
206 | const std::vector<size_t> pool_sizes{1, 2, 4, 8}; |
207 | |
208 | static auto fifo_lock = RWLockImpl::create(); |
209 | |
210 | for (auto pool_size : pool_sizes) |
211 | { |
212 | Stopwatch watch(CLOCK_MONOTONIC_COARSE); |
213 | |
214 | auto func = [&] () |
215 | { |
216 | for (auto i = 0; i < cycles; ++i) |
217 | { |
218 | auto lock = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY); |
219 | } |
220 | }; |
221 | |
222 | std::list<std::thread> threads; |
223 | for (size_t thread = 0; thread < pool_size; ++thread) |
224 | threads.emplace_back(func); |
225 | |
226 | for (auto & thread : threads) |
227 | thread.join(); |
228 | |
229 | auto total_time = watch.elapsedSeconds(); |
230 | std::cout << "Threads " << pool_size << ", total_time " << std::setprecision(2) << total_time << "\n" ; |
231 | } |
232 | } |
233 | |