1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <stdlib.h>
40#include <string.h>
41#include <portability/toku_pthread.h>
42
43#include "locktree.h"
44#include "lock_request.h"
45
46#include <util/status.h>
47
48namespace toku {
49
50void locktree_manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) {
51 m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY;
52 m_current_lock_memory = 0;
53
54 m_locktree_map.create();
55 m_lt_create_callback = create_cb;
56 m_lt_destroy_callback = destroy_cb;
57 m_lt_escalate_callback = escalate_cb;
58 m_lt_escalate_callback_extra = escalate_extra;
59 ZERO_STRUCT(m_mutex);
60 toku_mutex_init(*manager_mutex_key, &m_mutex, nullptr);
61
62 ZERO_STRUCT(m_lt_counters);
63
64 escalator_init();
65}
66
67void locktree_manager::destroy(void) {
68 escalator_destroy();
69 invariant(m_current_lock_memory == 0);
70 invariant(m_locktree_map.size() == 0);
71 m_locktree_map.destroy();
72 toku_mutex_destroy(&m_mutex);
73}
74
75void locktree_manager::mutex_lock(void) {
76 toku_mutex_lock(&m_mutex);
77}
78
79void locktree_manager::mutex_unlock(void) {
80 toku_mutex_unlock(&m_mutex);
81}
82
83size_t locktree_manager::get_max_lock_memory(void) {
84 return m_max_lock_memory;
85}
86
87int locktree_manager::set_max_lock_memory(size_t max_lock_memory) {
88 int r = 0;
89 mutex_lock();
90 if (max_lock_memory < m_current_lock_memory) {
91 r = EDOM;
92 } else {
93 m_max_lock_memory = max_lock_memory;
94 }
95 mutex_unlock();
96 return r;
97}
98
99int locktree_manager::find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id) {
100 if (lt->get_dict_id().dictid < dict_id.dictid) {
101 return -1;
102 } else if (lt->get_dict_id().dictid == dict_id.dictid) {
103 return 0;
104 } else {
105 return 1;
106 }
107}
108
109locktree *locktree_manager::locktree_map_find(const DICTIONARY_ID &dict_id) {
110 locktree *lt;
111 int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(dict_id, &lt, nullptr);
112 return r == 0 ? lt : nullptr;
113}
114
115void locktree_manager::locktree_map_put(locktree *lt) {
116 int r = m_locktree_map.insert<DICTIONARY_ID, find_by_dict_id>(lt, lt->get_dict_id(), nullptr);
117 invariant_zero(r);
118}
119
120void locktree_manager::locktree_map_remove(locktree *lt) {
121 uint32_t idx;
122 locktree *found_lt;
123 int r = m_locktree_map.find_zero<DICTIONARY_ID, find_by_dict_id>(
124 lt->get_dict_id(), &found_lt, &idx);
125 invariant_zero(r);
126 invariant(found_lt == lt);
127 r = m_locktree_map.delete_at(idx);
128 invariant_zero(r);
129}
130
131locktree *locktree_manager::get_lt(DICTIONARY_ID dict_id,
132 const comparator &cmp, void *on_create_extra) {
133
134 // hold the mutex around searching and maybe
135 // inserting into the locktree map
136 mutex_lock();
137
138 locktree *lt = locktree_map_find(dict_id);
139 if (lt == nullptr) {
140 XCALLOC(lt);
141 lt->create(this, dict_id, cmp);
142
143 // new locktree created - call the on_create callback
144 // and put it in the locktree map
145 if (m_lt_create_callback) {
146 int r = m_lt_create_callback(lt, on_create_extra);
147 if (r != 0) {
148 lt->release_reference();
149 lt->destroy();
150 toku_free(lt);
151 lt = nullptr;
152 }
153 }
154 if (lt) {
155 locktree_map_put(lt);
156 }
157 } else {
158 reference_lt(lt);
159 }
160
161 mutex_unlock();
162
163 return lt;
164}
165
166void locktree_manager::reference_lt(locktree *lt) {
167 // increment using a sync fetch and add.
168 // the caller guarantees that the lt won't be
169 // destroyed while we increment the count here.
170 //
171 // the caller can do this by already having an lt
172 // reference or by holding the manager mutex.
173 //
174 // if the manager's mutex is held, it is ok for the
175 // reference count to transition from 0 to 1 (no race),
176 // since we're serialized with other opens and closes.
177 lt->add_reference();
178}
179
180void locktree_manager::release_lt(locktree *lt) {
181 bool do_destroy = false;
182 DICTIONARY_ID dict_id = lt->get_dict_id();
183
184 // Release a reference on the locktree. If the count transitions to zero,
185 // then we *may* need to do the cleanup.
186 //
187 // Grab the manager's mutex and look for a locktree with this locktree's
188 // dictionary id. Since dictionary id's never get reused, any locktree
189 // found must be the one we just released a reference on.
190 //
191 // At least two things could have happened since we got the mutex:
192 // - Another thread gets a locktree with the same dict_id, increments
193 // the reference count. In this case, we shouldn't destroy it.
194 // - Another thread gets a locktree with the same dict_id and then
195 // releases it quickly, transitioning the reference count from zero to
196 // one and back to zero. In this case, only one of us should destroy it.
197 // It doesn't matter which. We originally missed this case, see #5776.
198 //
199 // After 5776, the high level rule for release is described below.
200 //
201 // If a thread releases a locktree and notices the reference count transition
202 // to zero, then that thread must immediately:
203 // - assume the locktree object is invalid
204 // - grab the manager's mutex
205 // - search the locktree map for a locktree with the same dict_id and remove
206 // it, if it exists. the destroy may be deferred.
207 // - release the manager's mutex
208 //
209 // This way, if many threads transition the same locktree's reference count
210 // from 1 to zero and wait behind the manager's mutex, only one of them will
211 // do the actual destroy and the others will happily do nothing.
212 uint32_t refs = lt->release_reference();
213 if (refs == 0) {
214 mutex_lock();
215 // lt may not have already been destroyed, so look it up.
216 locktree *find_lt = locktree_map_find(dict_id);
217 if (find_lt != nullptr) {
218 // A locktree is still in the map with that dict_id, so it must be
219 // equal to lt. This is true because dictionary ids are never reused.
220 // If the reference count is zero, it's our responsibility to remove
221 // it and do the destroy. Otherwise, someone still wants it.
222 // If the locktree is still valid then check if it should be deleted.
223 if (find_lt == lt) {
224 if (lt->get_reference_count() == 0) {
225 locktree_map_remove(lt);
226 do_destroy = true;
227 }
228 m_lt_counters.add(lt->get_lock_request_info()->counters);
229 }
230 }
231 mutex_unlock();
232 }
233
234 // if necessary, do the destroy without holding the mutex
235 if (do_destroy) {
236 if (m_lt_destroy_callback) {
237 m_lt_destroy_callback(lt);
238 }
239 lt->destroy();
240 toku_free(lt);
241 }
242}
243
244void locktree_manager::run_escalation(void) {
245 struct escalation_fn {
246 static void run(void *extra) {
247 locktree_manager *mgr = (locktree_manager *) extra;
248 mgr->escalate_all_locktrees();
249 };
250 };
251 m_escalator.run(this, escalation_fn::run, this);
252}
253
254// test-only version of lock escalation
255void locktree_manager::run_escalation_for_test(void) {
256 run_escalation();
257}
258
259void locktree_manager::escalate_all_locktrees(void) {
260 uint64_t t0 = toku_current_time_microsec();
261
262 // get all locktrees
263 mutex_lock();
264 int num_locktrees = m_locktree_map.size();
265 locktree **locktrees = new locktree *[num_locktrees];
266 for (int i = 0; i < num_locktrees; i++) {
267 int r = m_locktree_map.fetch(i, &locktrees[i]);
268 invariant_zero(r);
269 reference_lt(locktrees[i]);
270 }
271 mutex_unlock();
272
273 // escalate them
274 escalate_locktrees(locktrees, num_locktrees);
275
276 delete [] locktrees;
277
278 uint64_t t1 = toku_current_time_microsec();
279 add_escalator_wait_time(t1 - t0);
280}
281
282void locktree_manager::note_mem_used(uint64_t mem_used) {
283 (void) toku_sync_fetch_and_add(&m_current_lock_memory, mem_used);
284}
285
286void locktree_manager::note_mem_released(uint64_t mem_released) {
287 uint64_t old_mem_used = toku_sync_fetch_and_sub(&m_current_lock_memory, mem_released);
288 invariant(old_mem_used >= mem_released);
289}
290
291bool locktree_manager::out_of_locks(void) const {
292 return m_current_lock_memory >= m_max_lock_memory;
293}
294
295bool locktree_manager::over_big_threshold(void) {
296 return m_current_lock_memory >= m_max_lock_memory / 2;
297}
298
299int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callback callback,
300 void *extra) {
301 mutex_lock();
302 int r = 0;
303 size_t num_locktrees = m_locktree_map.size();
304 for (size_t i = 0; i < num_locktrees && r == 0; i++) {
305 locktree *lt;
306 r = m_locktree_map.fetch(i, &lt);
307 invariant_zero(r);
308
309 struct lt_lock_request_info *info = lt->get_lock_request_info();
310 toku_mutex_lock(&info->mutex);
311
312 size_t num_requests = info->pending_lock_requests.size();
313 for (size_t k = 0; k < num_requests && r == 0; k++) {
314 lock_request *req;
315 r = info->pending_lock_requests.fetch(k, &req);
316 invariant_zero(r);
317 r = callback(lt->get_dict_id(), req->get_txnid(),
318 req->get_left_key(), req->get_right_key(),
319 req->get_conflicting_txnid(), req->get_start_time(), extra);
320 }
321
322 toku_mutex_unlock(&info->mutex);
323 }
324 mutex_unlock();
325 return r;
326}
327
328int locktree_manager::check_current_lock_constraints(bool big_txn) {
329 int r = 0;
330 if (big_txn && over_big_threshold()) {
331 run_escalation();
332 if (over_big_threshold()) {
333 r = TOKUDB_OUT_OF_LOCKS;
334 }
335 }
336 if (r == 0 && out_of_locks()) {
337 run_escalation();
338 if (out_of_locks()) {
339 // return an error if we're still out of locks after escalation.
340 r = TOKUDB_OUT_OF_LOCKS;
341 }
342 }
343 return r;
344}
345
346void locktree_manager::escalator_init(void) {
347 ZERO_STRUCT(m_escalation_mutex);
348 toku_mutex_init(
349 *manager_escalation_mutex_key, &m_escalation_mutex, nullptr);
350 m_escalation_count = 0;
351 m_escalation_time = 0;
352 m_wait_escalation_count = 0;
353 m_wait_escalation_time = 0;
354 m_long_wait_escalation_count = 0;
355 m_long_wait_escalation_time = 0;
356 m_escalation_latest_result = 0;
357 m_escalator.create();
358}
359
360void locktree_manager::escalator_destroy(void) {
361 m_escalator.destroy();
362 toku_mutex_destroy(&m_escalation_mutex);
363}
364
365void locktree_manager::add_escalator_wait_time(uint64_t t) {
366 toku_mutex_lock(&m_escalation_mutex);
367 m_wait_escalation_count += 1;
368 m_wait_escalation_time += t;
369 if (t >= 1000000) {
370 m_long_wait_escalation_count += 1;
371 m_long_wait_escalation_time += t;
372 }
373 toku_mutex_unlock(&m_escalation_mutex);
374}
375
376void locktree_manager::escalate_locktrees(locktree **locktrees, int num_locktrees) {
377 // there are too many row locks in the system and we need to tidy up.
378 //
379 // a simple implementation of escalation does not attempt
380 // to reduce the memory foot print of each txn's range buffer.
381 // doing so would require some layering hackery (or a callback)
382 // and more complicated locking. for now, just escalate each
383 // locktree individually, in-place.
384 tokutime_t t0 = toku_time_now();
385 for (int i = 0; i < num_locktrees; i++) {
386 locktrees[i]->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra);
387 release_lt(locktrees[i]);
388 }
389 tokutime_t t1 = toku_time_now();
390
391 toku_mutex_lock(&m_escalation_mutex);
392 m_escalation_count++;
393 m_escalation_time += (t1 - t0);
394 m_escalation_latest_result = m_current_lock_memory;
395 toku_mutex_unlock(&m_escalation_mutex);
396}
397
398struct escalate_args {
399 locktree_manager *mgr;
400 locktree **locktrees;
401 int num_locktrees;
402};
403
404void locktree_manager::locktree_escalator::create(void) {
405 ZERO_STRUCT(m_escalator_mutex);
406 toku_mutex_init(*manager_escalator_mutex_key, &m_escalator_mutex, nullptr);
407 toku_cond_init(*manager_m_escalator_done_key, &m_escalator_done, nullptr);
408 m_escalator_running = false;
409}
410
411void locktree_manager::locktree_escalator::destroy(void) {
412 toku_cond_destroy(&m_escalator_done);
413 toku_mutex_destroy(&m_escalator_mutex);
414}
415
416void locktree_manager::locktree_escalator::run(locktree_manager *mgr, void (*escalate_locktrees_fun)(void *extra), void *extra) {
417 uint64_t t0 = toku_current_time_microsec();
418 toku_mutex_lock(&m_escalator_mutex);
419 if (!m_escalator_running) {
420 // run escalation on this thread
421 m_escalator_running = true;
422 toku_mutex_unlock(&m_escalator_mutex);
423 escalate_locktrees_fun(extra);
424 toku_mutex_lock(&m_escalator_mutex);
425 m_escalator_running = false;
426 toku_cond_broadcast(&m_escalator_done);
427 } else {
428 toku_cond_wait(&m_escalator_done, &m_escalator_mutex);
429 }
430 toku_mutex_unlock(&m_escalator_mutex);
431 uint64_t t1 = toku_current_time_microsec();
432 mgr->add_escalator_wait_time(t1 - t0);
433}
434
435void locktree_manager::get_status(LTM_STATUS statp) {
436 ltm_status.init();
437 LTM_STATUS_VAL(LTM_SIZE_CURRENT) = m_current_lock_memory;
438 LTM_STATUS_VAL(LTM_SIZE_LIMIT) = m_max_lock_memory;
439 LTM_STATUS_VAL(LTM_ESCALATION_COUNT) = m_escalation_count;
440 LTM_STATUS_VAL(LTM_ESCALATION_TIME) = m_escalation_time;
441 LTM_STATUS_VAL(LTM_ESCALATION_LATEST_RESULT) = m_escalation_latest_result;
442 LTM_STATUS_VAL(LTM_WAIT_ESCALATION_COUNT) = m_wait_escalation_count;
443 LTM_STATUS_VAL(LTM_WAIT_ESCALATION_TIME) = m_wait_escalation_time;
444 LTM_STATUS_VAL(LTM_LONG_WAIT_ESCALATION_COUNT) = m_long_wait_escalation_count;
445 LTM_STATUS_VAL(LTM_LONG_WAIT_ESCALATION_TIME) = m_long_wait_escalation_time;
446
447 uint64_t lock_requests_pending = 0;
448 uint64_t sto_num_eligible = 0;
449 uint64_t sto_end_early_count = 0;
450 tokutime_t sto_end_early_time = 0;
451 size_t num_locktrees = 0;
452 struct lt_counters lt_counters = {};
453
454 if (toku_mutex_trylock(&m_mutex) == 0) {
455 lt_counters = m_lt_counters;
456 num_locktrees = m_locktree_map.size();
457 for (size_t i = 0; i < num_locktrees; i++) {
458 locktree *lt;
459 int r = m_locktree_map.fetch(i, &lt);
460 invariant_zero(r);
461 if (toku_mutex_trylock(&lt->m_lock_request_info.mutex) == 0) {
462 lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size();
463 lt_counters.add(lt->get_lock_request_info()->counters);
464 toku_mutex_unlock(&lt->m_lock_request_info.mutex);
465 }
466 sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0;
467 sto_end_early_count += lt->m_sto_end_early_count;
468 sto_end_early_time += lt->m_sto_end_early_time;
469 }
470 mutex_unlock();
471 }
472
473 LTM_STATUS_VAL(LTM_NUM_LOCKTREES) = num_locktrees;
474 LTM_STATUS_VAL(LTM_LOCK_REQUESTS_PENDING) = lock_requests_pending;
475 LTM_STATUS_VAL(LTM_STO_NUM_ELIGIBLE) = sto_num_eligible;
476 LTM_STATUS_VAL(LTM_STO_END_EARLY_COUNT) = sto_end_early_count;
477 LTM_STATUS_VAL(LTM_STO_END_EARLY_TIME) = sto_end_early_time;
478 LTM_STATUS_VAL(LTM_WAIT_COUNT) = lt_counters.wait_count;
479 LTM_STATUS_VAL(LTM_WAIT_TIME) = lt_counters.wait_time;
480 LTM_STATUS_VAL(LTM_LONG_WAIT_COUNT) = lt_counters.long_wait_count;
481 LTM_STATUS_VAL(LTM_LONG_WAIT_TIME) = lt_counters.long_wait_time;
482 LTM_STATUS_VAL(LTM_TIMEOUT_COUNT) = lt_counters.timeout_count;
483 *statp = ltm_status;
484}
485
486void locktree_manager::kill_waiter(void *extra) {
487 mutex_lock();
488 int r = 0;
489 size_t num_locktrees = m_locktree_map.size();
490 for (size_t i = 0; i < num_locktrees; i++) {
491 locktree *lt;
492 r = m_locktree_map.fetch(i, &lt);
493 invariant_zero(r);
494 lock_request::kill_waiter(lt, extra);
495 }
496 mutex_unlock();
497}
498
499} /* namespace toku */
500