1#include "RWLock.h"
2#include <Common/Stopwatch.h>
3#include <Common/Exception.h>
4#include <Common/CurrentMetrics.h>
5#include <Common/ProfileEvents.h>
6
7
8namespace ProfileEvents
9{
10 extern const Event RWLockAcquiredReadLocks;
11 extern const Event RWLockAcquiredWriteLocks;
12 extern const Event RWLockReadersWaitMilliseconds;
13 extern const Event RWLockWritersWaitMilliseconds;
14}
15
16
17namespace CurrentMetrics
18{
19 extern const Metric RWLockWaitingReaders;
20 extern const Metric RWLockWaitingWriters;
21 extern const Metric RWLockActiveReaders;
22 extern const Metric RWLockActiveWriters;
23}
24
25
26namespace DB
27{
28
29namespace ErrorCodes
30{
31 extern const int LOGICAL_ERROR;
32 extern const int DEADLOCK_AVOIDED;
33}
34
35
36/** A single-use object that represents lock's ownership
37 * For the purpose of exception safety guarantees LockHolder is to be used in two steps:
38 * 1. Create an instance (allocating all the memory needed)
39 * 2. Associate the instance with the lock (attach to the lock and locking request group)
40 */
41class RWLockImpl::LockHolderImpl
42{
43 bool bound{false};
44 Type lock_type;
45 String query_id;
46 CurrentMetrics::Increment active_client_increment;
47 RWLock parent;
48 GroupsContainer::iterator it_group;
49
50public:
51 LockHolderImpl(const LockHolderImpl & other) = delete;
52 LockHolderImpl& operator=(const LockHolderImpl & other) = delete;
53
54 /// Implicit memory allocation for query_id is done here
55 LockHolderImpl(const String & query_id_, Type type)
56 : lock_type{type}, query_id{query_id_},
57 active_client_increment{
58 type == Type::Read ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters}
59 {
60 }
61
62 ~LockHolderImpl();
63
64private:
65 /// A separate method which binds the lock holder to the owned lock
66 /// N.B. It is very important that this method produces no allocations
67 bool bind_with(RWLock && parent_, GroupsContainer::iterator it_group_) noexcept
68 {
69 if (bound)
70 return false;
71 it_group = it_group_;
72 parent = std::move(parent_);
73 ++it_group->refererrs;
74 bound = true;
75 return true;
76 }
77
78 friend class RWLockImpl;
79};
80
81
82namespace
83{
84 /// Global information about all read locks that query has. It is needed to avoid some type of deadlocks.
85
86 class QueryLockInfo
87 {
88 private:
89 mutable std::mutex mutex;
90 std::map<std::string, size_t> queries;
91
92 public:
93 void add(const String & query_id)
94 {
95 std::lock_guard lock(mutex);
96
97 const auto res = queries.emplace(query_id, 1); // may throw
98 if (!res.second)
99 ++res.first->second;
100 }
101
102 void remove(const String & query_id) noexcept
103 {
104 std::lock_guard lock(mutex);
105
106 const auto query_it = queries.find(query_id);
107 if (query_it != queries.cend() && --query_it->second == 0)
108 queries.erase(query_it);
109 }
110
111 void check(const String & query_id) const
112 {
113 std::lock_guard lock(mutex);
114
115 if (queries.find(query_id) != queries.cend())
116 throw Exception("Possible deadlock avoided. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED);
117 }
118 };
119
120 QueryLockInfo all_read_locks;
121}
122
123
124/** To guarantee that we do not get any piece of our data corrupted:
125 * 1. Perform all actions that include allocations before changing lock's internal state
126 * 2. Roll back any changes that make the state inconsistent
127 *
128 * Note: "SM" in the commentaries below stands for STATE MODIFICATION
129 */
130RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id)
131{
132 const bool request_has_query_id = query_id != NO_QUERY;
133
134 Stopwatch watch(CLOCK_MONOTONIC_COARSE);
135 CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
136 : CurrentMetrics::RWLockWaitingWriters);
137 auto finalize_metrics = [type, &watch] ()
138 {
139 ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockAcquiredReadLocks
140 : ProfileEvents::RWLockAcquiredWriteLocks);
141 ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockReadersWaitMilliseconds
142 : ProfileEvents::RWLockWritersWaitMilliseconds, watch.elapsedMilliseconds());
143 };
144
145 /// This object is placed above unique_lock, because it may lock in destructor.
146 auto lock_holder = std::make_shared<LockHolderImpl>(query_id, type);
147
148 std::unique_lock lock(mutex);
149
150 /// The FastPath:
151 /// Check if the same query_id already holds the required lock in which case we can proceed without waiting
152 if (request_has_query_id)
153 {
154 const auto it_query = owner_queries.find(query_id);
155 if (it_query != owner_queries.end())
156 {
157 const auto current_owner_group = queue.begin();
158
159 /// XXX: it means we can't upgrade lock from read to write!
160 if (type == Write)
161 throw Exception(
162 "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
163 ErrorCodes::LOGICAL_ERROR);
164
165 if (current_owner_group->type == Write)
166 throw Exception(
167 "RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
168 ErrorCodes::LOGICAL_ERROR);
169
170 /// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
171 all_read_locks.add(query_id); /// SM1: may throw on insertion (nothing to roll back)
172 ++it_query->second; /// SM2: nothrow
173 lock_holder->bind_with(shared_from_this(), current_owner_group); /// SM3: nothrow
174
175 finalize_metrics();
176 return lock_holder;
177 }
178 }
179
180 /** If the query already has any active read lock and tries to acquire another read lock
181 * but it is not in front of the queue and has to wait, deadlock is possible:
182 *
183 * Example (four queries, two RWLocks - 'a' and 'b'):
184 *
185 * --> time -->
186 *
187 * q1: ra rb
188 * q2: wa
189 * q3: rb ra
190 * q4: wb
191 *
192 * We will throw an exception instead.
193 */
194
195 if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
196 {
197 if (type == Type::Read && request_has_query_id && !queue.empty())
198 all_read_locks.check(query_id);
199
200 /// Create a new group of locking requests
201 queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
202 }
203 else if (request_has_query_id && queue.size() > 1)
204 all_read_locks.check(query_id);
205
206 GroupsContainer::iterator it_group = std::prev(queue.end());
207
208 /// We need to reference the associated group before waiting to guarantee
209 /// that this group does not get deleted prematurely
210 ++it_group->refererrs;
211
212 /// Wait a notification until we will be the only in the group.
213 it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
214
215 --it_group->refererrs;
216
217 if (request_has_query_id)
218 {
219 try
220 {
221 if (type == Type::Read)
222 all_read_locks.add(query_id); /// SM2: may throw on insertion
223 /// and is safe to roll back unconditionally
224 const auto emplace_res =
225 owner_queries.emplace(query_id, 1); /// SM3: may throw on insertion
226 if (!emplace_res.second)
227 ++emplace_res.first->second; /// SM4: nothrow
228 }
229 catch (...)
230 {
231 /// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
232 /// We only need to roll back the changes to these objects: all_read_locks and the locking queue
233 if (type == Type::Read)
234 all_read_locks.remove(query_id); /// Rollback(SM2): nothrow
235
236 if (it_group->refererrs == 0)
237 {
238 const auto next = queue.erase(it_group); /// Rollback(SM1): nothrow
239 if (next != queue.end())
240 next->cv.notify_all();
241 }
242
243 throw;
244 }
245 }
246
247 lock_holder->bind_with(shared_from_this(), it_group); /// SM: nothrow
248
249 finalize_metrics();
250 return lock_holder;
251}
252
253
254/** The sequence points of acquiring lock's ownership by an instance of LockHolderImpl:
255 * 1. all_read_locks is updated
256 * 2. owner_queries is updated
257 * 3. request group is updated by LockHolderImpl which in turn becomes "bound"
258 *
259 * If by the time when destructor of LockHolderImpl is called the instance has been "bound",
260 * it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
261 * With the mutex locked the order of steps to restore the lock's state can be arbitrary
262 *
263 * We do not employ try-catch: if something bad happens, there is nothing we can do =(
264 */
265RWLockImpl::LockHolderImpl::~LockHolderImpl()
266{
267 if (!bound || parent == nullptr)
268 return;
269
270 std::lock_guard lock(parent->mutex);
271
272 /// The associated group must exist (and be the beginning of the queue?)
273 if (parent->queue.empty() || it_group != parent->queue.begin())
274 return;
275
276 /// If query_id is not empty it must be listed in parent->owner_queries
277 if (query_id != RWLockImpl::NO_QUERY)
278 {
279 const auto owner_it = parent->owner_queries.find(query_id);
280 if (owner_it != parent->owner_queries.end())
281 {
282 if (--owner_it->second == 0) /// SM: nothrow
283 parent->owner_queries.erase(owner_it); /// SM: nothrow
284
285 if (lock_type == RWLockImpl::Read)
286 all_read_locks.remove(query_id); /// SM: nothrow
287 }
288 }
289
290 /// If we are the last remaining referrer, remove the group and notify the next group
291 if (--it_group->refererrs == 0) /// SM: nothrow
292 {
293 const auto next = parent->queue.erase(it_group); /// SM: nothrow
294 if (next != parent->queue.end())
295 next->cv.notify_all();
296 }
297}
298
299}
300