1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <memory.h>
40
41#include <util/growable_array.h>
42
43#include <portability/toku_pthread.h>
44#include <portability/toku_time.h>
45
46#include "locktree.h"
47#include "range_buffer.h"
48
49// including the concurrent_tree here expands the templates
50// and "defines" the implementation, so we do it here in
51// the locktree source file instead of the header.
52#include "concurrent_tree.h"
53
54namespace toku {
55
56// A locktree represents the set of row locks owned by all transactions
57// over an open dictionary. Read and write ranges are represented as
58// a left and right key which are compared with the given descriptor
59// and comparison fn.
60//
61// Each locktree has a reference count which it manages
62// but does nothing based on the value of the reference count - it is
63// up to the user of the locktree to destroy it when it sees fit.
64
65void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) {
66 m_mgr = mgr;
67 m_dict_id = dict_id;
68
69 m_cmp.create_from(cmp);
70 m_reference_count = 1;
71 m_userdata = nullptr;
72
73 XCALLOC(m_rangetree);
74 m_rangetree->create(&m_cmp);
75
76 m_sto_txnid = TXNID_NONE;
77 m_sto_buffer.create();
78 m_sto_score = STO_SCORE_THRESHOLD;
79 m_sto_end_early_count = 0;
80 m_sto_end_early_time = 0;
81
82 m_lock_request_info.init();
83}
84
85void lt_lock_request_info::init(void) {
86 pending_lock_requests.create();
87 pending_is_empty = true;
88 ZERO_STRUCT(mutex);
89 toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr);
90 retry_want = retry_done = 0;
91 ZERO_STRUCT(counters);
92 ZERO_STRUCT(retry_mutex);
93 toku_mutex_init(
94 *locktree_request_info_retry_mutex_key, &retry_mutex, nullptr);
95 toku_cond_init(*locktree_request_info_retry_cv_key, &retry_cv, nullptr);
96 running_retry = false;
97
98 TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty,
99 sizeof(pending_is_empty));
100 TOKU_DRD_IGNORE_VAR(pending_is_empty);
101}
102
103void locktree::destroy(void) {
104 invariant(m_reference_count == 0);
105 invariant(m_lock_request_info.pending_lock_requests.size() == 0);
106 m_cmp.destroy();
107 m_rangetree->destroy();
108 toku_free(m_rangetree);
109 m_sto_buffer.destroy();
110 m_lock_request_info.destroy();
111}
112
113void lt_lock_request_info::destroy(void) {
114 pending_lock_requests.destroy();
115 toku_mutex_destroy(&mutex);
116 toku_mutex_destroy(&retry_mutex);
117 toku_cond_destroy(&retry_cv);
118}
119
120void locktree::add_reference(void) {
121 (void)toku_sync_add_and_fetch(&m_reference_count, 1);
122}
123
124uint32_t locktree::release_reference(void) {
125 return toku_sync_sub_and_fetch(&m_reference_count, 1);
126}
127
128uint32_t locktree::get_reference_count(void) {
129 return m_reference_count;
130}
131
132// a container for a range/txnid pair
133struct row_lock {
134 keyrange range;
135 TXNID txnid;
136};
137
138// iterate over a locked keyrange and copy out all of the data,
139// storing each row lock into the given growable array. the
140// caller does not own the range inside the returned row locks,
141// so remove from the tree with care using them as keys.
142static void iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange *lkr,
143 GrowableArray<row_lock> *row_locks) {
144 struct copy_fn_obj {
145 GrowableArray<row_lock> *row_locks;
146 bool fn(const keyrange &range, TXNID txnid) {
147 row_lock lock = { .range = range, .txnid = txnid };
148 row_locks->push(lock);
149 return true;
150 }
151 } copy_fn;
152 copy_fn.row_locks = row_locks;
153 lkr->iterate(&copy_fn);
154}
155
156// given a txnid and a set of overlapping row locks, determine
157// which txnids are conflicting, and store them in the conflicts
158// set, if given.
159static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_locks,
160 const TXNID &txnid, txnid_set *conflicts) {
161 bool conflicts_exist = false;
162 const size_t num_overlaps = row_locks.get_size();
163 for (size_t i = 0; i < num_overlaps; i++) {
164 const row_lock lock = row_locks.fetch_unchecked(i);
165 const TXNID other_txnid = lock.txnid;
166 if (other_txnid != txnid) {
167 if (conflicts) {
168 conflicts->add(other_txnid);
169 }
170 conflicts_exist = true;
171 }
172 }
173 return conflicts_exist;
174}
175
176// how much memory does a row lock take up in a concurrent tree?
177static uint64_t row_lock_size_in_tree(const row_lock &lock) {
178 const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
179 return lock.range.get_memory_size() + overhead;
180}
181
182// remove and destroy the given row lock from the locked keyrange,
183// then notify the memory tracker of the newly freed lock.
184static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
185 const row_lock &lock, locktree_manager *mgr) {
186 const uint64_t mem_released = row_lock_size_in_tree(lock);
187 lkr->remove(lock.range);
188 if (mgr != nullptr) {
189 mgr->note_mem_released(mem_released);
190 }
191}
192
193// insert a row lock into the locked keyrange, then notify
194// the memory tracker of this newly acquired lock.
195static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
196 const row_lock &lock, locktree_manager *mgr) {
197 uint64_t mem_used = row_lock_size_in_tree(lock);
198 lkr->insert(lock.range, lock.txnid);
199 if (mgr != nullptr) {
200 mgr->note_mem_used(mem_used);
201 }
202}
203
204void locktree::sto_begin(TXNID txnid) {
205 invariant(m_sto_txnid == TXNID_NONE);
206 invariant(m_sto_buffer.is_empty());
207 m_sto_txnid = txnid;
208}
209
210void locktree::sto_append(const DBT *left_key, const DBT *right_key) {
211 uint64_t buffer_mem, delta;
212 keyrange range;
213 range.create(left_key, right_key);
214
215 buffer_mem = m_sto_buffer.total_memory_size();
216 m_sto_buffer.append(left_key, right_key);
217 delta = m_sto_buffer.total_memory_size() - buffer_mem;
218 if (m_mgr != nullptr) {
219 m_mgr->note_mem_used(delta);
220 }
221}
222
223void locktree::sto_end(void) {
224 uint64_t mem_size = m_sto_buffer.total_memory_size();
225 if (m_mgr != nullptr) {
226 m_mgr->note_mem_released(mem_size);
227 }
228 m_sto_buffer.destroy();
229 m_sto_buffer.create();
230 m_sto_txnid = TXNID_NONE;
231}
232
233void locktree::sto_end_early_no_accounting(void *prepared_lkr) {
234 sto_migrate_buffer_ranges_to_tree(prepared_lkr);
235 sto_end();
236 toku_unsafe_set(m_sto_score, 0);
237}
238
239void locktree::sto_end_early(void *prepared_lkr) {
240 m_sto_end_early_count++;
241
242 tokutime_t t0 = toku_time_now();
243 sto_end_early_no_accounting(prepared_lkr);
244 tokutime_t t1 = toku_time_now();
245
246 m_sto_end_early_time += (t1 - t0);
247}
248
249void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
250 // There should be something to migrate, and nothing in the rangetree.
251 invariant(!m_sto_buffer.is_empty());
252 invariant(m_rangetree->is_empty());
253
254 concurrent_tree sto_rangetree;
255 concurrent_tree::locked_keyrange sto_lkr;
256 sto_rangetree.create(&m_cmp);
257
258 // insert all of the ranges from the single txnid buffer into a new rangtree
259 range_buffer::iterator iter(&m_sto_buffer);
260 range_buffer::iterator::record rec;
261 while (iter.current(&rec)) {
262 sto_lkr.prepare(&sto_rangetree);
263 int r = acquire_lock_consolidated(&sto_lkr,
264 m_sto_txnid, rec.get_left_key(), rec.get_right_key(), nullptr);
265 invariant_zero(r);
266 sto_lkr.release();
267 iter.next();
268 }
269
270 // Iterate the newly created rangetree and insert each range into the
271 // locktree's rangetree, on behalf of the old single txnid.
272 struct migrate_fn_obj {
273 concurrent_tree::locked_keyrange *dst_lkr;
274 bool fn(const keyrange &range, TXNID txnid) {
275 dst_lkr->insert(range, txnid);
276 return true;
277 }
278 } migrate_fn;
279 migrate_fn.dst_lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
280 sto_lkr.prepare(&sto_rangetree);
281 sto_lkr.iterate(&migrate_fn);
282 sto_lkr.remove_all();
283 sto_lkr.release();
284 sto_rangetree.destroy();
285 invariant(!m_rangetree->is_empty());
286}
287
288bool locktree::sto_try_acquire(void *prepared_lkr,
289 TXNID txnid,
290 const DBT *left_key, const DBT *right_key) {
291 if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
292 // We can do the optimization because the rangetree is empty, and
293 // we know its worth trying because the sto score is big enough.
294 sto_begin(txnid);
295 } else if (m_sto_txnid != TXNID_NONE) {
296 // We are currently doing the optimization. Check if we need to cancel
297 // it because a new txnid appeared, or if the current single txnid has
298 // taken too many locks already.
299 if (m_sto_txnid != txnid || m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
300 sto_end_early(prepared_lkr);
301 }
302 }
303
304 // At this point the sto txnid is properly set. If it is valid, then
305 // this txnid can append its lock to the sto buffer successfully.
306 if (m_sto_txnid != TXNID_NONE) {
307 invariant(m_sto_txnid == txnid);
308 sto_append(left_key, right_key);
309 return true;
310 } else {
311 invariant(m_sto_buffer.is_empty());
312 return false;
313 }
314}
315
316// try to acquire a lock and consolidate it with existing locks if possible
317// param: lkr, a prepared locked keyrange
318// return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
319int locktree::acquire_lock_consolidated(void *prepared_lkr,
320 TXNID txnid,
321 const DBT *left_key, const DBT *right_key,
322 txnid_set *conflicts) {
323 int r = 0;
324 concurrent_tree::locked_keyrange *lkr;
325
326 keyrange requested_range;
327 requested_range.create(left_key, right_key);
328 lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
329 lkr->acquire(requested_range);
330
331 // copy out the set of overlapping row locks.
332 GrowableArray<row_lock> overlapping_row_locks;
333 overlapping_row_locks.init();
334 iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
335 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
336
337 // if any overlapping row locks conflict with this request, bail out.
338 bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks,
339 txnid, conflicts);
340 if (!conflicts_exist) {
341 // there are no conflicts, so all of the overlaps are for the requesting txnid.
342 // so, we must consolidate all existing overlapping ranges and the requested
343 // range into one dominating range. then we insert the dominating range.
344 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
345 row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
346 invariant(overlapping_lock.txnid == txnid);
347 requested_range.extend(m_cmp, overlapping_lock.range);
348 remove_row_lock_from_tree(lkr, overlapping_lock, m_mgr);
349 }
350
351 row_lock new_lock = { .range = requested_range, .txnid = txnid };
352 insert_row_lock_into_tree(lkr, new_lock, m_mgr);
353 } else {
354 r = DB_LOCK_NOTGRANTED;
355 }
356
357 requested_range.destroy();
358 overlapping_row_locks.deinit();
359 return r;
360}
361
362// acquire a lock in the given key range, inclusive. if successful,
363// return 0. otherwise, populate the conflicts txnid_set with the set of
364// transactions that conflict with this request.
365int locktree::acquire_lock(bool is_write_request,
366 TXNID txnid,
367 const DBT *left_key, const DBT *right_key,
368 txnid_set *conflicts) {
369 int r = 0;
370
371 // we are only supporting write locks for simplicity
372 invariant(is_write_request);
373
374 // acquire and prepare a locked keyrange over the requested range.
375 // prepare is a serialzation point, so we take the opportunity to
376 // try the single txnid optimization first.
377 concurrent_tree::locked_keyrange lkr;
378 lkr.prepare(m_rangetree);
379
380 bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key);
381 if (!acquired) {
382 r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key, conflicts);
383 }
384
385 lkr.release();
386 return r;
387}
388
389int locktree::try_acquire_lock(bool is_write_request,
390 TXNID txnid,
391 const DBT *left_key, const DBT *right_key,
392 txnid_set *conflicts, bool big_txn) {
393 // All ranges in the locktree must have left endpoints <= right endpoints.
394 // Range comparisons rely on this fact, so we make a paranoid invariant here.
395 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
396 int r = m_mgr == nullptr ? 0 :
397 m_mgr->check_current_lock_constraints(big_txn);
398 if (r == 0) {
399 r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
400 }
401 return r;
402}
403
404// the locktree silently upgrades read locks to write locks for simplicity
405int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
406 txnid_set *conflicts, bool big_txn) {
407 return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn);
408}
409
410int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
411 txnid_set *conflicts, bool big_txn) {
412 return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
413}
414
415void locktree::get_conflicts(bool is_write_request,
416 TXNID txnid, const DBT *left_key, const DBT *right_key,
417 txnid_set *conflicts) {
418 // because we only support write locks, ignore this bit for now.
419 (void) is_write_request;
420
421 // preparing and acquire a locked keyrange over the range
422 keyrange range;
423 range.create(left_key, right_key);
424 concurrent_tree::locked_keyrange lkr;
425 lkr.prepare(m_rangetree);
426 lkr.acquire(range);
427
428 // copy out the set of overlapping row locks and determine the conflicts
429 GrowableArray<row_lock> overlapping_row_locks;
430 overlapping_row_locks.init();
431 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
432
433 // we don't care if conflicts exist. we just want the conflicts set populated.
434 (void) determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
435
436 lkr.release();
437 overlapping_row_locks.deinit();
438 range.destroy();
439}
440
441// Effect:
442// For each range in the lock tree that overlaps the given range and has
443// the given txnid, remove it.
444// Rationale:
445// In the common case, there is only the range [left_key, right_key] and
446// it is associated with txnid, so this is a single tree delete.
447//
448// However, consolidation and escalation change the objects in the tree
449// without telling the txn anything. In this case, the txn may own a
450// large range lock that represents its ownership of many smaller range
451// locks. For example, the txn may think it owns point locks on keys 1,
452// 2, and 3, but due to escalation, only the object [1,3] exists in the
453// tree.
454//
455// The first call for a small lock will remove the large range lock, and
456// the rest of the calls should do nothing. After the first release,
457// another thread can acquire one of the locks that the txn thinks it
458// still owns. That's ok, because the txn doesn't want it anymore (it
459// unlocks everything at once), but it may find a lock that it does not
460// own.
461//
462// In our example, the txn unlocks key 1, which actually removes the
463// whole lock [1,3]. Now, someone else can lock 2 before our txn gets
464// around to unlocking 2, so we should not remove that lock.
465void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
466 const DBT *left_key,
467 const DBT *right_key) {
468 keyrange release_range;
469 release_range.create(left_key, right_key);
470
471 // acquire and prepare a locked keyrange over the release range
472 concurrent_tree::locked_keyrange lkr;
473 lkr.prepare(m_rangetree);
474 lkr.acquire(release_range);
475
476 // copy out the set of overlapping row locks.
477 GrowableArray<row_lock> overlapping_row_locks;
478 overlapping_row_locks.init();
479 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
480 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
481
482 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
483 row_lock lock = overlapping_row_locks.fetch_unchecked(i);
484 // If this isn't our lock, that's ok, just don't remove it.
485 // See rationale above.
486 if (lock.txnid == txnid) {
487 remove_row_lock_from_tree(&lkr, lock, m_mgr);
488 }
489 }
490
491 lkr.release();
492 overlapping_row_locks.deinit();
493 release_range.destroy();
494}
495
496bool locktree::sto_txnid_is_valid_unsafe(void) const {
497 return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE;
498}
499
500int locktree::sto_get_score_unsafe(void) const {
501 return toku_unsafe_fetch(m_sto_score);
502}
503
504bool locktree::sto_try_release(TXNID txnid) {
505 bool released = false;
506 if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
507 // check the bit again with a prepared locked keyrange,
508 // which protects the optimization bits and rangetree data
509 concurrent_tree::locked_keyrange lkr;
510 lkr.prepare(m_rangetree);
511 if (m_sto_txnid != TXNID_NONE) {
512 // this txnid better be the single txnid on this locktree,
513 // or else we are in big trouble (meaning the logic is broken)
514 invariant(m_sto_txnid == txnid);
515 invariant(m_rangetree->is_empty());
516 sto_end();
517 released = true;
518 }
519 lkr.release();
520 }
521 return released;
522}
523
524// release all of the locks for a txnid whose endpoints are pairs
525// in the given range buffer.
526void locktree::release_locks(TXNID txnid, const range_buffer *ranges) {
527 // try the single txn optimization. if it worked, then all of the
528 // locks are already released, otherwise we need to do it here.
529 bool released = sto_try_release(txnid);
530 if (!released) {
531 range_buffer::iterator iter(ranges);
532 range_buffer::iterator::record rec;
533 while (iter.current(&rec)) {
534 const DBT *left_key = rec.get_left_key();
535 const DBT *right_key = rec.get_right_key();
536 // All ranges in the locktree must have left endpoints <= right endpoints.
537 // Range comparisons rely on this fact, so we make a paranoid invariant here.
538 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
539 remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
540 iter.next();
541 }
542 // Increase the sto score slightly. Eventually it will hit
543 // the threshold and we'll try the optimization again. This
544 // is how a previously multithreaded system transitions into
545 // a single threaded system that benefits from the optimization.
546 if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) {
547 toku_sync_fetch_and_add(&m_sto_score, 1);
548 }
549 }
550}
551
552// iterate over a locked keyrange and extract copies of the first N
553// row locks, storing each one into the given array of size N,
554// then removing each extracted lock from the locked keyrange.
555static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
556 locktree_manager *mgr,
557 row_lock *row_locks, int num_to_extract) {
558
559 struct extract_fn_obj {
560 int num_extracted;
561 int num_to_extract;
562 row_lock *row_locks;
563 bool fn(const keyrange &range, TXNID txnid) {
564 if (num_extracted < num_to_extract) {
565 row_lock lock;
566 lock.range.create_copy(range);
567 lock.txnid = txnid;
568 row_locks[num_extracted++] = lock;
569 return true;
570 } else {
571 return false;
572 }
573 }
574 } extract_fn;
575
576 extract_fn.row_locks = row_locks;
577 extract_fn.num_to_extract = num_to_extract;
578 extract_fn.num_extracted = 0;
579 lkr->iterate(&extract_fn);
580
581 // now that the ranges have been copied out, complete
582 // the extraction by removing the ranges from the tree.
583 // use remove_row_lock_from_tree() so we properly track the
584 // amount of memory and number of locks freed.
585 int num_extracted = extract_fn.num_extracted;
586 invariant(num_extracted <= num_to_extract);
587 for (int i = 0; i < num_extracted; i++) {
588 remove_row_lock_from_tree(lkr, row_locks[i], mgr);
589 }
590
591 return num_extracted;
592}
593
594// Store each newly escalated lock in a range buffer for appropriate txnid.
595// We'll rebuild the locktree by iterating over these ranges, and then we
596// can pass back each txnid/buffer pair individually through a callback
597// to notify higher layers that locks have changed.
598struct txnid_range_buffer {
599 TXNID txnid;
600 range_buffer buffer;
601
602 static int find_by_txnid(struct txnid_range_buffer *const &other_buffer, const TXNID &txnid) {
603 if (txnid < other_buffer->txnid) {
604 return -1;
605 } else if (other_buffer->txnid == txnid) {
606 return 0;
607 } else {
608 return 1;
609 }
610 }
611};
612
613// escalate the locks in the locktree by merging adjacent
614// locks that have the same txnid into one larger lock.
615//
616// if there's only one txnid in the locktree then this
617// approach works well. if there are many txnids and each
618// has locks in a random/alternating order, then this does
619// not work so well.
620void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_escalate_callback_extra) {
621 omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers;
622 range_buffers.create();
623
624 // prepare and acquire a locked keyrange on the entire locktree
625 concurrent_tree::locked_keyrange lkr;
626 keyrange infinite_range = keyrange::get_infinite_range();
627 lkr.prepare(m_rangetree);
628 lkr.acquire(infinite_range);
629
630 // if we're in the single txnid optimization, simply call it off.
631 // if you have to run escalation, you probably don't care about
632 // the optimization anyway, and this makes things easier.
633 if (m_sto_txnid != TXNID_NONE) {
634 // We are already accounting for this escalation time and
635 // count, so don't do it for sto_end_early too.
636 sto_end_early_no_accounting(&lkr);
637 }
638
639 // extract and remove batches of row locks from the locktree
640 int num_extracted;
641 const int num_row_locks_per_batch = 128;
642 row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
643
644 // we always remove the "first" n because we are removing n
645 // each time we do an extraction. so this loops until its empty.
646 while ((num_extracted =
647 extract_first_n_row_locks(&lkr, m_mgr, extracted_buf,
648 num_row_locks_per_batch)) > 0) {
649 int current_index = 0;
650 while (current_index < num_extracted) {
651 // every batch of extracted locks is in range-sorted order. search
652 // through them and merge adjacent locks with the same txnid into
653 // one dominating lock and save it to a set of escalated locks.
654 //
655 // first, find the index of the next row lock with a different txnid
656 int next_txnid_index = current_index + 1;
657 while (next_txnid_index < num_extracted &&
658 extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) {
659 next_txnid_index++;
660 }
661
662 // Create an escalated range for the current txnid that dominates
663 // each range between the current indext and the next txnid's index.
664 const TXNID current_txnid = extracted_buf[current_index].txnid;
665 const DBT *escalated_left_key = extracted_buf[current_index].range.get_left_key();
666 const DBT *escalated_right_key = extracted_buf[next_txnid_index - 1].range.get_right_key();
667
668 // Try to find a range buffer for the current txnid. Create one if it doesn't exist.
669 // Then, append the new escalated range to the buffer.
670 uint32_t idx;
671 struct txnid_range_buffer *existing_range_buffer;
672 int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
673 current_txnid,
674 &existing_range_buffer,
675 &idx
676 );
677 if (r == DB_NOTFOUND) {
678 struct txnid_range_buffer *XMALLOC(new_range_buffer);
679 new_range_buffer->txnid = current_txnid;
680 new_range_buffer->buffer.create();
681 new_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
682 range_buffers.insert_at(new_range_buffer, idx);
683 } else {
684 invariant_zero(r);
685 invariant(existing_range_buffer->txnid == current_txnid);
686 existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
687 }
688
689 current_index = next_txnid_index;
690 }
691
692 // destroy the ranges copied during the extraction
693 for (int i = 0; i < num_extracted; i++) {
694 extracted_buf[i].range.destroy();
695 }
696 }
697 toku_free(extracted_buf);
698
699 // Rebuild the locktree from each range in each range buffer,
700 // then notify higher layers that the txnid's locks have changed.
701 invariant(m_rangetree->is_empty());
702 const size_t num_range_buffers = range_buffers.size();
703 for (size_t i = 0; i < num_range_buffers; i++) {
704 struct txnid_range_buffer *current_range_buffer;
705 int r = range_buffers.fetch(i, &current_range_buffer);
706 invariant_zero(r);
707
708 const TXNID current_txnid = current_range_buffer->txnid;
709 range_buffer::iterator iter(&current_range_buffer->buffer);
710 range_buffer::iterator::record rec;
711 while (iter.current(&rec)) {
712 keyrange range;
713 range.create(rec.get_left_key(), rec.get_right_key());
714 row_lock lock = { .range = range, .txnid = current_txnid };
715 insert_row_lock_into_tree(&lkr, lock, m_mgr);
716 iter.next();
717 }
718
719 // Notify higher layers that locks have changed for the current txnid
720 if (after_escalate_callback) {
721 after_escalate_callback(current_txnid, this, current_range_buffer->buffer, after_escalate_callback_extra);
722 }
723 current_range_buffer->buffer.destroy();
724 }
725
726 while (range_buffers.size() > 0) {
727 struct txnid_range_buffer *buffer;
728 int r = range_buffers.fetch(0, &buffer);
729 invariant_zero(r);
730 r = range_buffers.delete_at(0);
731 invariant_zero(r);
732 toku_free(buffer);
733 }
734 range_buffers.destroy();
735
736 lkr.release();
737}
738
739void *locktree::get_userdata(void) const {
740 return m_userdata;
741}
742
743void locktree::set_userdata(void *userdata) {
744 m_userdata = userdata;
745}
746
747struct lt_lock_request_info *locktree::get_lock_request_info(void) {
748 return &m_lock_request_info;
749}
750
751void locktree::set_comparator(const comparator &cmp) {
752 m_cmp.inherit(cmp);
753}
754
755locktree_manager *locktree::get_manager(void) const {
756 return m_mgr;
757}
758
759int locktree::compare(const locktree *lt) const {
760 if (m_dict_id.dictid < lt->m_dict_id.dictid) {
761 return -1;
762 } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
763 return 0;
764 } else {
765 return 1;
766 }
767}
768
769DICTIONARY_ID locktree::get_dict_id() const {
770 return m_dict_id;
771}
772
773} /* namespace toku */
774