1#include <gtest/gtest.h>
2
3#include <Common/Exception.h>
4#include <Common/RWLock.h>
5#include <Common/Stopwatch.h>
6#include <common/Types.h>
7#include <Common/ThreadPool.h>
8#include <random>
9#include <pcg_random.hpp>
10#include <thread>
11#include <atomic>
12#include <iomanip>
13
14
15using namespace DB;
16
17namespace DB
18{
19 namespace ErrorCodes
20 {
21 extern const int DEADLOCK_AVOIDED;
22 }
23}
24
25
26TEST(Common, RWLock_1)
27{
28 constexpr int cycles = 1000;
29 const std::vector<size_t> pool_sizes{1, 2, 4, 8};
30
31 static std::atomic<int> readers{0};
32 static std::atomic<int> writers{0};
33
34 static auto fifo_lock = RWLockImpl::create();
35
36 static thread_local std::random_device rd;
37 static thread_local pcg64 gen(rd());
38
39 auto func = [&] (size_t threads, int round)
40 {
41 for (int i = 0; i < cycles; ++i)
42 {
43 auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockImpl::Read : RWLockImpl::Write;
44 auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
45
46 auto lock = fifo_lock->getLock(type, RWLockImpl::NO_QUERY);
47
48 if (type == RWLockImpl::Write)
49 {
50 ++writers;
51
52 ASSERT_EQ(writers, 1);
53 ASSERT_EQ(readers, 0);
54
55 std::this_thread::sleep_for(sleep_for);
56
57 --writers;
58 }
59 else
60 {
61 ++readers;
62
63 ASSERT_EQ(writers, 0);
64 ASSERT_GE(readers, 1);
65 ASSERT_LE(readers, threads);
66
67 std::this_thread::sleep_for(sleep_for);
68
69 --readers;
70 }
71 }
72 };
73
74 for (auto pool_size : pool_sizes)
75 {
76 for (int round = 0; round < 10; ++round)
77 {
78 Stopwatch watch(CLOCK_MONOTONIC_COARSE);
79
80 std::list<std::thread> threads;
81 for (size_t thread = 0; thread < pool_size; ++thread)
82 threads.emplace_back([=] () { func(pool_size, round); });
83
84 for (auto & thread : threads)
85 thread.join();
86
87 auto total_time = watch.elapsedSeconds();
88 std::cout << "Threads " << pool_size << ", round " << round << ", total_time " << std::setprecision(2) << total_time << "\n";
89 }
90 }
91}
92
93TEST(Common, RWLock_Recursive)
94{
95 constexpr auto cycles = 10000;
96
97 static auto fifo_lock = RWLockImpl::create();
98
99 static thread_local std::random_device rd;
100 static thread_local pcg64 gen(rd());
101
102 std::thread t1([&] ()
103 {
104 for (int i = 0; i < 2 * cycles; ++i)
105 {
106 auto lock = fifo_lock->getLock(RWLockImpl::Write, "q1");
107
108 auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
109 std::this_thread::sleep_for(sleep_for);
110 }
111 });
112
113 std::thread t2([&] ()
114 {
115 for (int i = 0; i < cycles; ++i)
116 {
117 auto lock1 = fifo_lock->getLock(RWLockImpl::Read, "q2");
118
119 auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
120 std::this_thread::sleep_for(sleep_for);
121
122 auto lock2 = fifo_lock->getLock(RWLockImpl::Read, "q2");
123
124 EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, "q2");});
125 }
126
127 fifo_lock->getLock(RWLockImpl::Write, "q2");
128 });
129
130 t1.join();
131 t2.join();
132}
133
134
135TEST(Common, RWLock_Deadlock)
136{
137 static auto lock1 = RWLockImpl::create();
138 static auto lock2 = RWLockImpl::create();
139
140 /**
141 * q1: r1 r2
142 * q2: w1
143 * q3: r2 r1
144 * q4: w2
145 */
146
147 std::thread t1([&] ()
148 {
149 auto holder1 = lock1->getLock(RWLockImpl::Read, "q1");
150 usleep(100000);
151 usleep(100000);
152 usleep(100000);
153 try
154 {
155 auto holder2 = lock2->getLock(RWLockImpl::Read, "q1");
156 }
157 catch (const Exception & e)
158 {
159 if (e.code() != ErrorCodes::DEADLOCK_AVOIDED)
160 throw;
161 }
162 });
163
164 std::thread t2([&] ()
165 {
166 usleep(100000);
167 auto holder1 = lock1->getLock(RWLockImpl::Write, "q2");
168 });
169
170 std::thread t3([&] ()
171 {
172 usleep(100000);
173 usleep(100000);
174 auto holder2 = lock2->getLock(RWLockImpl::Read, "q3");
175 usleep(100000);
176 usleep(100000);
177 try
178 {
179 auto holder1 = lock1->getLock(RWLockImpl::Read, "q3");
180 }
181 catch (const Exception & e)
182 {
183 if (e.code() != ErrorCodes::DEADLOCK_AVOIDED)
184 throw;
185 }
186 });
187
188 std::thread t4([&] ()
189 {
190 usleep(100000);
191 usleep(100000);
192 usleep(100000);
193 auto holder2 = lock2->getLock(RWLockImpl::Write, "q4");
194 });
195
196 t1.join();
197 t2.join();
198 t3.join();
199 t4.join();
200}
201
202
203TEST(Common, RWLock_PerfTest_Readers)
204{
205 constexpr int cycles = 100000; // 100k
206 const std::vector<size_t> pool_sizes{1, 2, 4, 8};
207
208 static auto fifo_lock = RWLockImpl::create();
209
210 for (auto pool_size : pool_sizes)
211 {
212 Stopwatch watch(CLOCK_MONOTONIC_COARSE);
213
214 auto func = [&] ()
215 {
216 for (auto i = 0; i < cycles; ++i)
217 {
218 auto lock = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY);
219 }
220 };
221
222 std::list<std::thread> threads;
223 for (size_t thread = 0; thread < pool_size; ++thread)
224 threads.emplace_back(func);
225
226 for (auto & thread : threads)
227 thread.join();
228
229 auto total_time = watch.elapsedSeconds();
230 std::cout << "Threads " << pool_size << ", total_time " << std::setprecision(2) << total_time << "\n";
231 }
232}
233