| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #include <memory.h> |
| 40 | |
| 41 | #include <util/growable_array.h> |
| 42 | |
| 43 | #include <portability/toku_pthread.h> |
| 44 | #include <portability/toku_time.h> |
| 45 | |
| 46 | #include "locktree.h" |
| 47 | #include "range_buffer.h" |
| 48 | |
| 49 | // including the concurrent_tree here expands the templates |
| 50 | // and "defines" the implementation, so we do it here in |
| 51 | // the locktree source file instead of the header. |
| 52 | #include "concurrent_tree.h" |
| 53 | |
| 54 | namespace toku { |
| 55 | |
| 56 | // A locktree represents the set of row locks owned by all transactions |
| 57 | // over an open dictionary. Read and write ranges are represented as |
| 58 | // a left and right key which are compared with the given descriptor |
| 59 | // and comparison fn. |
| 60 | // |
| 61 | // Each locktree has a reference count which it manages |
| 62 | // but does nothing based on the value of the reference count - it is |
| 63 | // up to the user of the locktree to destroy it when it sees fit. |
| 64 | |
| 65 | void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) { |
| 66 | m_mgr = mgr; |
| 67 | m_dict_id = dict_id; |
| 68 | |
| 69 | m_cmp.create_from(cmp); |
| 70 | m_reference_count = 1; |
| 71 | m_userdata = nullptr; |
| 72 | |
| 73 | XCALLOC(m_rangetree); |
| 74 | m_rangetree->create(&m_cmp); |
| 75 | |
| 76 | m_sto_txnid = TXNID_NONE; |
| 77 | m_sto_buffer.create(); |
| 78 | m_sto_score = STO_SCORE_THRESHOLD; |
| 79 | m_sto_end_early_count = 0; |
| 80 | m_sto_end_early_time = 0; |
| 81 | |
| 82 | m_lock_request_info.init(); |
| 83 | } |
| 84 | |
| 85 | void lt_lock_request_info::init(void) { |
| 86 | pending_lock_requests.create(); |
| 87 | pending_is_empty = true; |
| 88 | ZERO_STRUCT(mutex); |
| 89 | toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr); |
| 90 | retry_want = retry_done = 0; |
| 91 | ZERO_STRUCT(counters); |
| 92 | ZERO_STRUCT(retry_mutex); |
| 93 | toku_mutex_init( |
| 94 | *locktree_request_info_retry_mutex_key, &retry_mutex, nullptr); |
| 95 | toku_cond_init(*locktree_request_info_retry_cv_key, &retry_cv, nullptr); |
| 96 | running_retry = false; |
| 97 | |
| 98 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty, |
| 99 | sizeof(pending_is_empty)); |
| 100 | TOKU_DRD_IGNORE_VAR(pending_is_empty); |
| 101 | } |
| 102 | |
| 103 | void locktree::destroy(void) { |
| 104 | invariant(m_reference_count == 0); |
| 105 | invariant(m_lock_request_info.pending_lock_requests.size() == 0); |
| 106 | m_cmp.destroy(); |
| 107 | m_rangetree->destroy(); |
| 108 | toku_free(m_rangetree); |
| 109 | m_sto_buffer.destroy(); |
| 110 | m_lock_request_info.destroy(); |
| 111 | } |
| 112 | |
| 113 | void lt_lock_request_info::destroy(void) { |
| 114 | pending_lock_requests.destroy(); |
| 115 | toku_mutex_destroy(&mutex); |
| 116 | toku_mutex_destroy(&retry_mutex); |
| 117 | toku_cond_destroy(&retry_cv); |
| 118 | } |
| 119 | |
| 120 | void locktree::add_reference(void) { |
| 121 | (void)toku_sync_add_and_fetch(&m_reference_count, 1); |
| 122 | } |
| 123 | |
| 124 | uint32_t locktree::release_reference(void) { |
| 125 | return toku_sync_sub_and_fetch(&m_reference_count, 1); |
| 126 | } |
| 127 | |
| 128 | uint32_t locktree::get_reference_count(void) { |
| 129 | return m_reference_count; |
| 130 | } |
| 131 | |
| 132 | // a container for a range/txnid pair |
| 133 | struct row_lock { |
| 134 | keyrange range; |
| 135 | TXNID txnid; |
| 136 | }; |
| 137 | |
| 138 | // iterate over a locked keyrange and copy out all of the data, |
| 139 | // storing each row lock into the given growable array. the |
| 140 | // caller does not own the range inside the returned row locks, |
| 141 | // so remove from the tree with care using them as keys. |
| 142 | static void iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange *lkr, |
| 143 | GrowableArray<row_lock> *row_locks) { |
| 144 | struct copy_fn_obj { |
| 145 | GrowableArray<row_lock> *row_locks; |
| 146 | bool fn(const keyrange &range, TXNID txnid) { |
| 147 | row_lock lock = { .range = range, .txnid = txnid }; |
| 148 | row_locks->push(lock); |
| 149 | return true; |
| 150 | } |
| 151 | } copy_fn; |
| 152 | copy_fn.row_locks = row_locks; |
| 153 | lkr->iterate(©_fn); |
| 154 | } |
| 155 | |
| 156 | // given a txnid and a set of overlapping row locks, determine |
| 157 | // which txnids are conflicting, and store them in the conflicts |
| 158 | // set, if given. |
| 159 | static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_locks, |
| 160 | const TXNID &txnid, txnid_set *conflicts) { |
| 161 | bool conflicts_exist = false; |
| 162 | const size_t num_overlaps = row_locks.get_size(); |
| 163 | for (size_t i = 0; i < num_overlaps; i++) { |
| 164 | const row_lock lock = row_locks.fetch_unchecked(i); |
| 165 | const TXNID other_txnid = lock.txnid; |
| 166 | if (other_txnid != txnid) { |
| 167 | if (conflicts) { |
| 168 | conflicts->add(other_txnid); |
| 169 | } |
| 170 | conflicts_exist = true; |
| 171 | } |
| 172 | } |
| 173 | return conflicts_exist; |
| 174 | } |
| 175 | |
| 176 | // how much memory does a row lock take up in a concurrent tree? |
| 177 | static uint64_t row_lock_size_in_tree(const row_lock &lock) { |
| 178 | const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead(); |
| 179 | return lock.range.get_memory_size() + overhead; |
| 180 | } |
| 181 | |
| 182 | // remove and destroy the given row lock from the locked keyrange, |
| 183 | // then notify the memory tracker of the newly freed lock. |
| 184 | static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr, |
| 185 | const row_lock &lock, locktree_manager *mgr) { |
| 186 | const uint64_t mem_released = row_lock_size_in_tree(lock); |
| 187 | lkr->remove(lock.range); |
| 188 | if (mgr != nullptr) { |
| 189 | mgr->note_mem_released(mem_released); |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | // insert a row lock into the locked keyrange, then notify |
| 194 | // the memory tracker of this newly acquired lock. |
| 195 | static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr, |
| 196 | const row_lock &lock, locktree_manager *mgr) { |
| 197 | uint64_t mem_used = row_lock_size_in_tree(lock); |
| 198 | lkr->insert(lock.range, lock.txnid); |
| 199 | if (mgr != nullptr) { |
| 200 | mgr->note_mem_used(mem_used); |
| 201 | } |
| 202 | } |
| 203 | |
| 204 | void locktree::sto_begin(TXNID txnid) { |
| 205 | invariant(m_sto_txnid == TXNID_NONE); |
| 206 | invariant(m_sto_buffer.is_empty()); |
| 207 | m_sto_txnid = txnid; |
| 208 | } |
| 209 | |
| 210 | void locktree::sto_append(const DBT *left_key, const DBT *right_key) { |
| 211 | uint64_t buffer_mem, delta; |
| 212 | keyrange range; |
| 213 | range.create(left_key, right_key); |
| 214 | |
| 215 | buffer_mem = m_sto_buffer.total_memory_size(); |
| 216 | m_sto_buffer.append(left_key, right_key); |
| 217 | delta = m_sto_buffer.total_memory_size() - buffer_mem; |
| 218 | if (m_mgr != nullptr) { |
| 219 | m_mgr->note_mem_used(delta); |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | void locktree::sto_end(void) { |
| 224 | uint64_t mem_size = m_sto_buffer.total_memory_size(); |
| 225 | if (m_mgr != nullptr) { |
| 226 | m_mgr->note_mem_released(mem_size); |
| 227 | } |
| 228 | m_sto_buffer.destroy(); |
| 229 | m_sto_buffer.create(); |
| 230 | m_sto_txnid = TXNID_NONE; |
| 231 | } |
| 232 | |
| 233 | void locktree::sto_end_early_no_accounting(void *prepared_lkr) { |
| 234 | sto_migrate_buffer_ranges_to_tree(prepared_lkr); |
| 235 | sto_end(); |
| 236 | toku_unsafe_set(m_sto_score, 0); |
| 237 | } |
| 238 | |
| 239 | void locktree::sto_end_early(void *prepared_lkr) { |
| 240 | m_sto_end_early_count++; |
| 241 | |
| 242 | tokutime_t t0 = toku_time_now(); |
| 243 | sto_end_early_no_accounting(prepared_lkr); |
| 244 | tokutime_t t1 = toku_time_now(); |
| 245 | |
| 246 | m_sto_end_early_time += (t1 - t0); |
| 247 | } |
| 248 | |
| 249 | void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) { |
| 250 | // There should be something to migrate, and nothing in the rangetree. |
| 251 | invariant(!m_sto_buffer.is_empty()); |
| 252 | invariant(m_rangetree->is_empty()); |
| 253 | |
| 254 | concurrent_tree sto_rangetree; |
| 255 | concurrent_tree::locked_keyrange sto_lkr; |
| 256 | sto_rangetree.create(&m_cmp); |
| 257 | |
| 258 | // insert all of the ranges from the single txnid buffer into a new rangtree |
| 259 | range_buffer::iterator iter(&m_sto_buffer); |
| 260 | range_buffer::iterator::record rec; |
| 261 | while (iter.current(&rec)) { |
| 262 | sto_lkr.prepare(&sto_rangetree); |
| 263 | int r = acquire_lock_consolidated(&sto_lkr, |
| 264 | m_sto_txnid, rec.get_left_key(), rec.get_right_key(), nullptr); |
| 265 | invariant_zero(r); |
| 266 | sto_lkr.release(); |
| 267 | iter.next(); |
| 268 | } |
| 269 | |
| 270 | // Iterate the newly created rangetree and insert each range into the |
| 271 | // locktree's rangetree, on behalf of the old single txnid. |
| 272 | struct migrate_fn_obj { |
| 273 | concurrent_tree::locked_keyrange *dst_lkr; |
| 274 | bool fn(const keyrange &range, TXNID txnid) { |
| 275 | dst_lkr->insert(range, txnid); |
| 276 | return true; |
| 277 | } |
| 278 | } migrate_fn; |
| 279 | migrate_fn.dst_lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr); |
| 280 | sto_lkr.prepare(&sto_rangetree); |
| 281 | sto_lkr.iterate(&migrate_fn); |
| 282 | sto_lkr.remove_all(); |
| 283 | sto_lkr.release(); |
| 284 | sto_rangetree.destroy(); |
| 285 | invariant(!m_rangetree->is_empty()); |
| 286 | } |
| 287 | |
| 288 | bool locktree::sto_try_acquire(void *prepared_lkr, |
| 289 | TXNID txnid, |
| 290 | const DBT *left_key, const DBT *right_key) { |
| 291 | if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) { |
| 292 | // We can do the optimization because the rangetree is empty, and |
| 293 | // we know its worth trying because the sto score is big enough. |
| 294 | sto_begin(txnid); |
| 295 | } else if (m_sto_txnid != TXNID_NONE) { |
| 296 | // We are currently doing the optimization. Check if we need to cancel |
| 297 | // it because a new txnid appeared, or if the current single txnid has |
| 298 | // taken too many locks already. |
| 299 | if (m_sto_txnid != txnid || m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) { |
| 300 | sto_end_early(prepared_lkr); |
| 301 | } |
| 302 | } |
| 303 | |
| 304 | // At this point the sto txnid is properly set. If it is valid, then |
| 305 | // this txnid can append its lock to the sto buffer successfully. |
| 306 | if (m_sto_txnid != TXNID_NONE) { |
| 307 | invariant(m_sto_txnid == txnid); |
| 308 | sto_append(left_key, right_key); |
| 309 | return true; |
| 310 | } else { |
| 311 | invariant(m_sto_buffer.is_empty()); |
| 312 | return false; |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | // try to acquire a lock and consolidate it with existing locks if possible |
| 317 | // param: lkr, a prepared locked keyrange |
| 318 | // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist. |
| 319 | int locktree::acquire_lock_consolidated(void *prepared_lkr, |
| 320 | TXNID txnid, |
| 321 | const DBT *left_key, const DBT *right_key, |
| 322 | txnid_set *conflicts) { |
| 323 | int r = 0; |
| 324 | concurrent_tree::locked_keyrange *lkr; |
| 325 | |
| 326 | keyrange requested_range; |
| 327 | requested_range.create(left_key, right_key); |
| 328 | lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr); |
| 329 | lkr->acquire(requested_range); |
| 330 | |
| 331 | // copy out the set of overlapping row locks. |
| 332 | GrowableArray<row_lock> overlapping_row_locks; |
| 333 | overlapping_row_locks.init(); |
| 334 | iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks); |
| 335 | size_t num_overlapping_row_locks = overlapping_row_locks.get_size(); |
| 336 | |
| 337 | // if any overlapping row locks conflict with this request, bail out. |
| 338 | bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks, |
| 339 | txnid, conflicts); |
| 340 | if (!conflicts_exist) { |
| 341 | // there are no conflicts, so all of the overlaps are for the requesting txnid. |
| 342 | // so, we must consolidate all existing overlapping ranges and the requested |
| 343 | // range into one dominating range. then we insert the dominating range. |
| 344 | for (size_t i = 0; i < num_overlapping_row_locks; i++) { |
| 345 | row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i); |
| 346 | invariant(overlapping_lock.txnid == txnid); |
| 347 | requested_range.extend(m_cmp, overlapping_lock.range); |
| 348 | remove_row_lock_from_tree(lkr, overlapping_lock, m_mgr); |
| 349 | } |
| 350 | |
| 351 | row_lock new_lock = { .range = requested_range, .txnid = txnid }; |
| 352 | insert_row_lock_into_tree(lkr, new_lock, m_mgr); |
| 353 | } else { |
| 354 | r = DB_LOCK_NOTGRANTED; |
| 355 | } |
| 356 | |
| 357 | requested_range.destroy(); |
| 358 | overlapping_row_locks.deinit(); |
| 359 | return r; |
| 360 | } |
| 361 | |
| 362 | // acquire a lock in the given key range, inclusive. if successful, |
| 363 | // return 0. otherwise, populate the conflicts txnid_set with the set of |
| 364 | // transactions that conflict with this request. |
| 365 | int locktree::acquire_lock(bool is_write_request, |
| 366 | TXNID txnid, |
| 367 | const DBT *left_key, const DBT *right_key, |
| 368 | txnid_set *conflicts) { |
| 369 | int r = 0; |
| 370 | |
| 371 | // we are only supporting write locks for simplicity |
| 372 | invariant(is_write_request); |
| 373 | |
| 374 | // acquire and prepare a locked keyrange over the requested range. |
| 375 | // prepare is a serialzation point, so we take the opportunity to |
| 376 | // try the single txnid optimization first. |
| 377 | concurrent_tree::locked_keyrange lkr; |
| 378 | lkr.prepare(m_rangetree); |
| 379 | |
| 380 | bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key); |
| 381 | if (!acquired) { |
| 382 | r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key, conflicts); |
| 383 | } |
| 384 | |
| 385 | lkr.release(); |
| 386 | return r; |
| 387 | } |
| 388 | |
| 389 | int locktree::try_acquire_lock(bool is_write_request, |
| 390 | TXNID txnid, |
| 391 | const DBT *left_key, const DBT *right_key, |
| 392 | txnid_set *conflicts, bool big_txn) { |
| 393 | // All ranges in the locktree must have left endpoints <= right endpoints. |
| 394 | // Range comparisons rely on this fact, so we make a paranoid invariant here. |
| 395 | paranoid_invariant(m_cmp(left_key, right_key) <= 0); |
| 396 | int r = m_mgr == nullptr ? 0 : |
| 397 | m_mgr->check_current_lock_constraints(big_txn); |
| 398 | if (r == 0) { |
| 399 | r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts); |
| 400 | } |
| 401 | return r; |
| 402 | } |
| 403 | |
| 404 | // the locktree silently upgrades read locks to write locks for simplicity |
| 405 | int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, |
| 406 | txnid_set *conflicts, bool big_txn) { |
| 407 | return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn); |
| 408 | } |
| 409 | |
| 410 | int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, |
| 411 | txnid_set *conflicts, bool big_txn) { |
| 412 | return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn); |
| 413 | } |
| 414 | |
| 415 | void locktree::get_conflicts(bool is_write_request, |
| 416 | TXNID txnid, const DBT *left_key, const DBT *right_key, |
| 417 | txnid_set *conflicts) { |
| 418 | // because we only support write locks, ignore this bit for now. |
| 419 | (void) is_write_request; |
| 420 | |
| 421 | // preparing and acquire a locked keyrange over the range |
| 422 | keyrange range; |
| 423 | range.create(left_key, right_key); |
| 424 | concurrent_tree::locked_keyrange lkr; |
| 425 | lkr.prepare(m_rangetree); |
| 426 | lkr.acquire(range); |
| 427 | |
| 428 | // copy out the set of overlapping row locks and determine the conflicts |
| 429 | GrowableArray<row_lock> overlapping_row_locks; |
| 430 | overlapping_row_locks.init(); |
| 431 | iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks); |
| 432 | |
| 433 | // we don't care if conflicts exist. we just want the conflicts set populated. |
| 434 | (void) determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts); |
| 435 | |
| 436 | lkr.release(); |
| 437 | overlapping_row_locks.deinit(); |
| 438 | range.destroy(); |
| 439 | } |
| 440 | |
| 441 | // Effect: |
| 442 | // For each range in the lock tree that overlaps the given range and has |
| 443 | // the given txnid, remove it. |
| 444 | // Rationale: |
| 445 | // In the common case, there is only the range [left_key, right_key] and |
| 446 | // it is associated with txnid, so this is a single tree delete. |
| 447 | // |
| 448 | // However, consolidation and escalation change the objects in the tree |
| 449 | // without telling the txn anything. In this case, the txn may own a |
| 450 | // large range lock that represents its ownership of many smaller range |
| 451 | // locks. For example, the txn may think it owns point locks on keys 1, |
| 452 | // 2, and 3, but due to escalation, only the object [1,3] exists in the |
| 453 | // tree. |
| 454 | // |
| 455 | // The first call for a small lock will remove the large range lock, and |
| 456 | // the rest of the calls should do nothing. After the first release, |
| 457 | // another thread can acquire one of the locks that the txn thinks it |
| 458 | // still owns. That's ok, because the txn doesn't want it anymore (it |
| 459 | // unlocks everything at once), but it may find a lock that it does not |
| 460 | // own. |
| 461 | // |
| 462 | // In our example, the txn unlocks key 1, which actually removes the |
| 463 | // whole lock [1,3]. Now, someone else can lock 2 before our txn gets |
| 464 | // around to unlocking 2, so we should not remove that lock. |
| 465 | void locktree::remove_overlapping_locks_for_txnid(TXNID txnid, |
| 466 | const DBT *left_key, |
| 467 | const DBT *right_key) { |
| 468 | keyrange release_range; |
| 469 | release_range.create(left_key, right_key); |
| 470 | |
| 471 | // acquire and prepare a locked keyrange over the release range |
| 472 | concurrent_tree::locked_keyrange lkr; |
| 473 | lkr.prepare(m_rangetree); |
| 474 | lkr.acquire(release_range); |
| 475 | |
| 476 | // copy out the set of overlapping row locks. |
| 477 | GrowableArray<row_lock> overlapping_row_locks; |
| 478 | overlapping_row_locks.init(); |
| 479 | iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks); |
| 480 | size_t num_overlapping_row_locks = overlapping_row_locks.get_size(); |
| 481 | |
| 482 | for (size_t i = 0; i < num_overlapping_row_locks; i++) { |
| 483 | row_lock lock = overlapping_row_locks.fetch_unchecked(i); |
| 484 | // If this isn't our lock, that's ok, just don't remove it. |
| 485 | // See rationale above. |
| 486 | if (lock.txnid == txnid) { |
| 487 | remove_row_lock_from_tree(&lkr, lock, m_mgr); |
| 488 | } |
| 489 | } |
| 490 | |
| 491 | lkr.release(); |
| 492 | overlapping_row_locks.deinit(); |
| 493 | release_range.destroy(); |
| 494 | } |
| 495 | |
| 496 | bool locktree::sto_txnid_is_valid_unsafe(void) const { |
| 497 | return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE; |
| 498 | } |
| 499 | |
| 500 | int locktree::sto_get_score_unsafe(void) const { |
| 501 | return toku_unsafe_fetch(m_sto_score); |
| 502 | } |
| 503 | |
| 504 | bool locktree::sto_try_release(TXNID txnid) { |
| 505 | bool released = false; |
| 506 | if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) { |
| 507 | // check the bit again with a prepared locked keyrange, |
| 508 | // which protects the optimization bits and rangetree data |
| 509 | concurrent_tree::locked_keyrange lkr; |
| 510 | lkr.prepare(m_rangetree); |
| 511 | if (m_sto_txnid != TXNID_NONE) { |
| 512 | // this txnid better be the single txnid on this locktree, |
| 513 | // or else we are in big trouble (meaning the logic is broken) |
| 514 | invariant(m_sto_txnid == txnid); |
| 515 | invariant(m_rangetree->is_empty()); |
| 516 | sto_end(); |
| 517 | released = true; |
| 518 | } |
| 519 | lkr.release(); |
| 520 | } |
| 521 | return released; |
| 522 | } |
| 523 | |
| 524 | // release all of the locks for a txnid whose endpoints are pairs |
| 525 | // in the given range buffer. |
| 526 | void locktree::release_locks(TXNID txnid, const range_buffer *ranges) { |
| 527 | // try the single txn optimization. if it worked, then all of the |
| 528 | // locks are already released, otherwise we need to do it here. |
| 529 | bool released = sto_try_release(txnid); |
| 530 | if (!released) { |
| 531 | range_buffer::iterator iter(ranges); |
| 532 | range_buffer::iterator::record rec; |
| 533 | while (iter.current(&rec)) { |
| 534 | const DBT *left_key = rec.get_left_key(); |
| 535 | const DBT *right_key = rec.get_right_key(); |
| 536 | // All ranges in the locktree must have left endpoints <= right endpoints. |
| 537 | // Range comparisons rely on this fact, so we make a paranoid invariant here. |
| 538 | paranoid_invariant(m_cmp(left_key, right_key) <= 0); |
| 539 | remove_overlapping_locks_for_txnid(txnid, left_key, right_key); |
| 540 | iter.next(); |
| 541 | } |
| 542 | // Increase the sto score slightly. Eventually it will hit |
| 543 | // the threshold and we'll try the optimization again. This |
| 544 | // is how a previously multithreaded system transitions into |
| 545 | // a single threaded system that benefits from the optimization. |
| 546 | if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) { |
| 547 | toku_sync_fetch_and_add(&m_sto_score, 1); |
| 548 | } |
| 549 | } |
| 550 | } |
| 551 | |
| 552 | // iterate over a locked keyrange and extract copies of the first N |
| 553 | // row locks, storing each one into the given array of size N, |
| 554 | // then removing each extracted lock from the locked keyrange. |
| 555 | static int (concurrent_tree::locked_keyrange *lkr, |
| 556 | locktree_manager *mgr, |
| 557 | row_lock *row_locks, int ) { |
| 558 | |
| 559 | struct { |
| 560 | int ; |
| 561 | int ; |
| 562 | row_lock *row_locks; |
| 563 | bool fn(const keyrange &range, TXNID txnid) { |
| 564 | if (num_extracted < num_to_extract) { |
| 565 | row_lock lock; |
| 566 | lock.range.create_copy(range); |
| 567 | lock.txnid = txnid; |
| 568 | row_locks[num_extracted++] = lock; |
| 569 | return true; |
| 570 | } else { |
| 571 | return false; |
| 572 | } |
| 573 | } |
| 574 | } ; |
| 575 | |
| 576 | extract_fn.row_locks = row_locks; |
| 577 | extract_fn.num_to_extract = num_to_extract; |
| 578 | extract_fn.num_extracted = 0; |
| 579 | lkr->iterate(&extract_fn); |
| 580 | |
| 581 | // now that the ranges have been copied out, complete |
| 582 | // the extraction by removing the ranges from the tree. |
| 583 | // use remove_row_lock_from_tree() so we properly track the |
| 584 | // amount of memory and number of locks freed. |
| 585 | int = extract_fn.num_extracted; |
| 586 | invariant(num_extracted <= num_to_extract); |
| 587 | for (int i = 0; i < num_extracted; i++) { |
| 588 | remove_row_lock_from_tree(lkr, row_locks[i], mgr); |
| 589 | } |
| 590 | |
| 591 | return num_extracted; |
| 592 | } |
| 593 | |
| 594 | // Store each newly escalated lock in a range buffer for appropriate txnid. |
| 595 | // We'll rebuild the locktree by iterating over these ranges, and then we |
| 596 | // can pass back each txnid/buffer pair individually through a callback |
| 597 | // to notify higher layers that locks have changed. |
| 598 | struct txnid_range_buffer { |
| 599 | TXNID txnid; |
| 600 | range_buffer buffer; |
| 601 | |
| 602 | static int find_by_txnid(struct txnid_range_buffer *const &other_buffer, const TXNID &txnid) { |
| 603 | if (txnid < other_buffer->txnid) { |
| 604 | return -1; |
| 605 | } else if (other_buffer->txnid == txnid) { |
| 606 | return 0; |
| 607 | } else { |
| 608 | return 1; |
| 609 | } |
| 610 | } |
| 611 | }; |
| 612 | |
| 613 | // escalate the locks in the locktree by merging adjacent |
| 614 | // locks that have the same txnid into one larger lock. |
| 615 | // |
| 616 | // if there's only one txnid in the locktree then this |
| 617 | // approach works well. if there are many txnids and each |
| 618 | // has locks in a random/alternating order, then this does |
| 619 | // not work so well. |
| 620 | void locktree::escalate(lt_escalate_cb after_escalate_callback, void *) { |
| 621 | omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers; |
| 622 | range_buffers.create(); |
| 623 | |
| 624 | // prepare and acquire a locked keyrange on the entire locktree |
| 625 | concurrent_tree::locked_keyrange lkr; |
| 626 | keyrange infinite_range = keyrange::get_infinite_range(); |
| 627 | lkr.prepare(m_rangetree); |
| 628 | lkr.acquire(infinite_range); |
| 629 | |
| 630 | // if we're in the single txnid optimization, simply call it off. |
| 631 | // if you have to run escalation, you probably don't care about |
| 632 | // the optimization anyway, and this makes things easier. |
| 633 | if (m_sto_txnid != TXNID_NONE) { |
| 634 | // We are already accounting for this escalation time and |
| 635 | // count, so don't do it for sto_end_early too. |
| 636 | sto_end_early_no_accounting(&lkr); |
| 637 | } |
| 638 | |
| 639 | // extract and remove batches of row locks from the locktree |
| 640 | int ; |
| 641 | const int num_row_locks_per_batch = 128; |
| 642 | row_lock *XCALLOC_N(num_row_locks_per_batch, ); |
| 643 | |
| 644 | // we always remove the "first" n because we are removing n |
| 645 | // each time we do an extraction. so this loops until its empty. |
| 646 | while ((num_extracted = |
| 647 | extract_first_n_row_locks(&lkr, m_mgr, extracted_buf, |
| 648 | num_row_locks_per_batch)) > 0) { |
| 649 | int current_index = 0; |
| 650 | while (current_index < num_extracted) { |
| 651 | // every batch of extracted locks is in range-sorted order. search |
| 652 | // through them and merge adjacent locks with the same txnid into |
| 653 | // one dominating lock and save it to a set of escalated locks. |
| 654 | // |
| 655 | // first, find the index of the next row lock with a different txnid |
| 656 | int next_txnid_index = current_index + 1; |
| 657 | while (next_txnid_index < num_extracted && |
| 658 | extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) { |
| 659 | next_txnid_index++; |
| 660 | } |
| 661 | |
| 662 | // Create an escalated range for the current txnid that dominates |
| 663 | // each range between the current indext and the next txnid's index. |
| 664 | const TXNID current_txnid = extracted_buf[current_index].txnid; |
| 665 | const DBT *escalated_left_key = extracted_buf[current_index].range.get_left_key(); |
| 666 | const DBT *escalated_right_key = extracted_buf[next_txnid_index - 1].range.get_right_key(); |
| 667 | |
| 668 | // Try to find a range buffer for the current txnid. Create one if it doesn't exist. |
| 669 | // Then, append the new escalated range to the buffer. |
| 670 | uint32_t idx; |
| 671 | struct txnid_range_buffer *existing_range_buffer; |
| 672 | int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>( |
| 673 | current_txnid, |
| 674 | &existing_range_buffer, |
| 675 | &idx |
| 676 | ); |
| 677 | if (r == DB_NOTFOUND) { |
| 678 | struct txnid_range_buffer *XMALLOC(new_range_buffer); |
| 679 | new_range_buffer->txnid = current_txnid; |
| 680 | new_range_buffer->buffer.create(); |
| 681 | new_range_buffer->buffer.append(escalated_left_key, escalated_right_key); |
| 682 | range_buffers.insert_at(new_range_buffer, idx); |
| 683 | } else { |
| 684 | invariant_zero(r); |
| 685 | invariant(existing_range_buffer->txnid == current_txnid); |
| 686 | existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key); |
| 687 | } |
| 688 | |
| 689 | current_index = next_txnid_index; |
| 690 | } |
| 691 | |
| 692 | // destroy the ranges copied during the extraction |
| 693 | for (int i = 0; i < num_extracted; i++) { |
| 694 | extracted_buf[i].range.destroy(); |
| 695 | } |
| 696 | } |
| 697 | toku_free(extracted_buf); |
| 698 | |
| 699 | // Rebuild the locktree from each range in each range buffer, |
| 700 | // then notify higher layers that the txnid's locks have changed. |
| 701 | invariant(m_rangetree->is_empty()); |
| 702 | const size_t num_range_buffers = range_buffers.size(); |
| 703 | for (size_t i = 0; i < num_range_buffers; i++) { |
| 704 | struct txnid_range_buffer *current_range_buffer; |
| 705 | int r = range_buffers.fetch(i, ¤t_range_buffer); |
| 706 | invariant_zero(r); |
| 707 | |
| 708 | const TXNID current_txnid = current_range_buffer->txnid; |
| 709 | range_buffer::iterator iter(¤t_range_buffer->buffer); |
| 710 | range_buffer::iterator::record rec; |
| 711 | while (iter.current(&rec)) { |
| 712 | keyrange range; |
| 713 | range.create(rec.get_left_key(), rec.get_right_key()); |
| 714 | row_lock lock = { .range = range, .txnid = current_txnid }; |
| 715 | insert_row_lock_into_tree(&lkr, lock, m_mgr); |
| 716 | iter.next(); |
| 717 | } |
| 718 | |
| 719 | // Notify higher layers that locks have changed for the current txnid |
| 720 | if (after_escalate_callback) { |
| 721 | after_escalate_callback(current_txnid, this, current_range_buffer->buffer, after_escalate_callback_extra); |
| 722 | } |
| 723 | current_range_buffer->buffer.destroy(); |
| 724 | } |
| 725 | |
| 726 | while (range_buffers.size() > 0) { |
| 727 | struct txnid_range_buffer *buffer; |
| 728 | int r = range_buffers.fetch(0, &buffer); |
| 729 | invariant_zero(r); |
| 730 | r = range_buffers.delete_at(0); |
| 731 | invariant_zero(r); |
| 732 | toku_free(buffer); |
| 733 | } |
| 734 | range_buffers.destroy(); |
| 735 | |
| 736 | lkr.release(); |
| 737 | } |
| 738 | |
| 739 | void *locktree::get_userdata(void) const { |
| 740 | return m_userdata; |
| 741 | } |
| 742 | |
| 743 | void locktree::set_userdata(void *userdata) { |
| 744 | m_userdata = userdata; |
| 745 | } |
| 746 | |
| 747 | struct lt_lock_request_info *locktree::get_lock_request_info(void) { |
| 748 | return &m_lock_request_info; |
| 749 | } |
| 750 | |
| 751 | void locktree::set_comparator(const comparator &cmp) { |
| 752 | m_cmp.inherit(cmp); |
| 753 | } |
| 754 | |
| 755 | locktree_manager *locktree::get_manager(void) const { |
| 756 | return m_mgr; |
| 757 | } |
| 758 | |
| 759 | int locktree::compare(const locktree *lt) const { |
| 760 | if (m_dict_id.dictid < lt->m_dict_id.dictid) { |
| 761 | return -1; |
| 762 | } else if (m_dict_id.dictid == lt->m_dict_id.dictid) { |
| 763 | return 0; |
| 764 | } else { |
| 765 | return 1; |
| 766 | } |
| 767 | } |
| 768 | |
| 769 | DICTIONARY_ID locktree::get_dict_id() const { |
| 770 | return m_dict_id; |
| 771 | } |
| 772 | |
| 773 | } /* namespace toku */ |
| 774 | |