1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <memory.h> |
40 | |
41 | #include <util/growable_array.h> |
42 | |
43 | #include <portability/toku_pthread.h> |
44 | #include <portability/toku_time.h> |
45 | |
46 | #include "locktree.h" |
47 | #include "range_buffer.h" |
48 | |
49 | // including the concurrent_tree here expands the templates |
50 | // and "defines" the implementation, so we do it here in |
51 | // the locktree source file instead of the header. |
52 | #include "concurrent_tree.h" |
53 | |
54 | namespace toku { |
55 | |
56 | // A locktree represents the set of row locks owned by all transactions |
57 | // over an open dictionary. Read and write ranges are represented as |
58 | // a left and right key which are compared with the given descriptor |
59 | // and comparison fn. |
60 | // |
61 | // Each locktree has a reference count which it manages |
62 | // but does nothing based on the value of the reference count - it is |
63 | // up to the user of the locktree to destroy it when it sees fit. |
64 | |
65 | void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) { |
66 | m_mgr = mgr; |
67 | m_dict_id = dict_id; |
68 | |
69 | m_cmp.create_from(cmp); |
70 | m_reference_count = 1; |
71 | m_userdata = nullptr; |
72 | |
73 | XCALLOC(m_rangetree); |
74 | m_rangetree->create(&m_cmp); |
75 | |
76 | m_sto_txnid = TXNID_NONE; |
77 | m_sto_buffer.create(); |
78 | m_sto_score = STO_SCORE_THRESHOLD; |
79 | m_sto_end_early_count = 0; |
80 | m_sto_end_early_time = 0; |
81 | |
82 | m_lock_request_info.init(); |
83 | } |
84 | |
85 | void lt_lock_request_info::init(void) { |
86 | pending_lock_requests.create(); |
87 | pending_is_empty = true; |
88 | ZERO_STRUCT(mutex); |
89 | toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr); |
90 | retry_want = retry_done = 0; |
91 | ZERO_STRUCT(counters); |
92 | ZERO_STRUCT(retry_mutex); |
93 | toku_mutex_init( |
94 | *locktree_request_info_retry_mutex_key, &retry_mutex, nullptr); |
95 | toku_cond_init(*locktree_request_info_retry_cv_key, &retry_cv, nullptr); |
96 | running_retry = false; |
97 | |
98 | TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty, |
99 | sizeof(pending_is_empty)); |
100 | TOKU_DRD_IGNORE_VAR(pending_is_empty); |
101 | } |
102 | |
103 | void locktree::destroy(void) { |
104 | invariant(m_reference_count == 0); |
105 | invariant(m_lock_request_info.pending_lock_requests.size() == 0); |
106 | m_cmp.destroy(); |
107 | m_rangetree->destroy(); |
108 | toku_free(m_rangetree); |
109 | m_sto_buffer.destroy(); |
110 | m_lock_request_info.destroy(); |
111 | } |
112 | |
113 | void lt_lock_request_info::destroy(void) { |
114 | pending_lock_requests.destroy(); |
115 | toku_mutex_destroy(&mutex); |
116 | toku_mutex_destroy(&retry_mutex); |
117 | toku_cond_destroy(&retry_cv); |
118 | } |
119 | |
120 | void locktree::add_reference(void) { |
121 | (void)toku_sync_add_and_fetch(&m_reference_count, 1); |
122 | } |
123 | |
124 | uint32_t locktree::release_reference(void) { |
125 | return toku_sync_sub_and_fetch(&m_reference_count, 1); |
126 | } |
127 | |
128 | uint32_t locktree::get_reference_count(void) { |
129 | return m_reference_count; |
130 | } |
131 | |
132 | // a container for a range/txnid pair |
133 | struct row_lock { |
134 | keyrange range; |
135 | TXNID txnid; |
136 | }; |
137 | |
138 | // iterate over a locked keyrange and copy out all of the data, |
139 | // storing each row lock into the given growable array. the |
140 | // caller does not own the range inside the returned row locks, |
141 | // so remove from the tree with care using them as keys. |
142 | static void iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_keyrange *lkr, |
143 | GrowableArray<row_lock> *row_locks) { |
144 | struct copy_fn_obj { |
145 | GrowableArray<row_lock> *row_locks; |
146 | bool fn(const keyrange &range, TXNID txnid) { |
147 | row_lock lock = { .range = range, .txnid = txnid }; |
148 | row_locks->push(lock); |
149 | return true; |
150 | } |
151 | } copy_fn; |
152 | copy_fn.row_locks = row_locks; |
153 | lkr->iterate(©_fn); |
154 | } |
155 | |
156 | // given a txnid and a set of overlapping row locks, determine |
157 | // which txnids are conflicting, and store them in the conflicts |
158 | // set, if given. |
159 | static bool determine_conflicting_txnids(const GrowableArray<row_lock> &row_locks, |
160 | const TXNID &txnid, txnid_set *conflicts) { |
161 | bool conflicts_exist = false; |
162 | const size_t num_overlaps = row_locks.get_size(); |
163 | for (size_t i = 0; i < num_overlaps; i++) { |
164 | const row_lock lock = row_locks.fetch_unchecked(i); |
165 | const TXNID other_txnid = lock.txnid; |
166 | if (other_txnid != txnid) { |
167 | if (conflicts) { |
168 | conflicts->add(other_txnid); |
169 | } |
170 | conflicts_exist = true; |
171 | } |
172 | } |
173 | return conflicts_exist; |
174 | } |
175 | |
176 | // how much memory does a row lock take up in a concurrent tree? |
177 | static uint64_t row_lock_size_in_tree(const row_lock &lock) { |
178 | const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead(); |
179 | return lock.range.get_memory_size() + overhead; |
180 | } |
181 | |
182 | // remove and destroy the given row lock from the locked keyrange, |
183 | // then notify the memory tracker of the newly freed lock. |
184 | static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr, |
185 | const row_lock &lock, locktree_manager *mgr) { |
186 | const uint64_t mem_released = row_lock_size_in_tree(lock); |
187 | lkr->remove(lock.range); |
188 | if (mgr != nullptr) { |
189 | mgr->note_mem_released(mem_released); |
190 | } |
191 | } |
192 | |
193 | // insert a row lock into the locked keyrange, then notify |
194 | // the memory tracker of this newly acquired lock. |
195 | static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr, |
196 | const row_lock &lock, locktree_manager *mgr) { |
197 | uint64_t mem_used = row_lock_size_in_tree(lock); |
198 | lkr->insert(lock.range, lock.txnid); |
199 | if (mgr != nullptr) { |
200 | mgr->note_mem_used(mem_used); |
201 | } |
202 | } |
203 | |
204 | void locktree::sto_begin(TXNID txnid) { |
205 | invariant(m_sto_txnid == TXNID_NONE); |
206 | invariant(m_sto_buffer.is_empty()); |
207 | m_sto_txnid = txnid; |
208 | } |
209 | |
210 | void locktree::sto_append(const DBT *left_key, const DBT *right_key) { |
211 | uint64_t buffer_mem, delta; |
212 | keyrange range; |
213 | range.create(left_key, right_key); |
214 | |
215 | buffer_mem = m_sto_buffer.total_memory_size(); |
216 | m_sto_buffer.append(left_key, right_key); |
217 | delta = m_sto_buffer.total_memory_size() - buffer_mem; |
218 | if (m_mgr != nullptr) { |
219 | m_mgr->note_mem_used(delta); |
220 | } |
221 | } |
222 | |
223 | void locktree::sto_end(void) { |
224 | uint64_t mem_size = m_sto_buffer.total_memory_size(); |
225 | if (m_mgr != nullptr) { |
226 | m_mgr->note_mem_released(mem_size); |
227 | } |
228 | m_sto_buffer.destroy(); |
229 | m_sto_buffer.create(); |
230 | m_sto_txnid = TXNID_NONE; |
231 | } |
232 | |
233 | void locktree::sto_end_early_no_accounting(void *prepared_lkr) { |
234 | sto_migrate_buffer_ranges_to_tree(prepared_lkr); |
235 | sto_end(); |
236 | toku_unsafe_set(m_sto_score, 0); |
237 | } |
238 | |
239 | void locktree::sto_end_early(void *prepared_lkr) { |
240 | m_sto_end_early_count++; |
241 | |
242 | tokutime_t t0 = toku_time_now(); |
243 | sto_end_early_no_accounting(prepared_lkr); |
244 | tokutime_t t1 = toku_time_now(); |
245 | |
246 | m_sto_end_early_time += (t1 - t0); |
247 | } |
248 | |
249 | void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) { |
250 | // There should be something to migrate, and nothing in the rangetree. |
251 | invariant(!m_sto_buffer.is_empty()); |
252 | invariant(m_rangetree->is_empty()); |
253 | |
254 | concurrent_tree sto_rangetree; |
255 | concurrent_tree::locked_keyrange sto_lkr; |
256 | sto_rangetree.create(&m_cmp); |
257 | |
258 | // insert all of the ranges from the single txnid buffer into a new rangtree |
259 | range_buffer::iterator iter(&m_sto_buffer); |
260 | range_buffer::iterator::record rec; |
261 | while (iter.current(&rec)) { |
262 | sto_lkr.prepare(&sto_rangetree); |
263 | int r = acquire_lock_consolidated(&sto_lkr, |
264 | m_sto_txnid, rec.get_left_key(), rec.get_right_key(), nullptr); |
265 | invariant_zero(r); |
266 | sto_lkr.release(); |
267 | iter.next(); |
268 | } |
269 | |
270 | // Iterate the newly created rangetree and insert each range into the |
271 | // locktree's rangetree, on behalf of the old single txnid. |
272 | struct migrate_fn_obj { |
273 | concurrent_tree::locked_keyrange *dst_lkr; |
274 | bool fn(const keyrange &range, TXNID txnid) { |
275 | dst_lkr->insert(range, txnid); |
276 | return true; |
277 | } |
278 | } migrate_fn; |
279 | migrate_fn.dst_lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr); |
280 | sto_lkr.prepare(&sto_rangetree); |
281 | sto_lkr.iterate(&migrate_fn); |
282 | sto_lkr.remove_all(); |
283 | sto_lkr.release(); |
284 | sto_rangetree.destroy(); |
285 | invariant(!m_rangetree->is_empty()); |
286 | } |
287 | |
288 | bool locktree::sto_try_acquire(void *prepared_lkr, |
289 | TXNID txnid, |
290 | const DBT *left_key, const DBT *right_key) { |
291 | if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) { |
292 | // We can do the optimization because the rangetree is empty, and |
293 | // we know its worth trying because the sto score is big enough. |
294 | sto_begin(txnid); |
295 | } else if (m_sto_txnid != TXNID_NONE) { |
296 | // We are currently doing the optimization. Check if we need to cancel |
297 | // it because a new txnid appeared, or if the current single txnid has |
298 | // taken too many locks already. |
299 | if (m_sto_txnid != txnid || m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) { |
300 | sto_end_early(prepared_lkr); |
301 | } |
302 | } |
303 | |
304 | // At this point the sto txnid is properly set. If it is valid, then |
305 | // this txnid can append its lock to the sto buffer successfully. |
306 | if (m_sto_txnid != TXNID_NONE) { |
307 | invariant(m_sto_txnid == txnid); |
308 | sto_append(left_key, right_key); |
309 | return true; |
310 | } else { |
311 | invariant(m_sto_buffer.is_empty()); |
312 | return false; |
313 | } |
314 | } |
315 | |
316 | // try to acquire a lock and consolidate it with existing locks if possible |
317 | // param: lkr, a prepared locked keyrange |
318 | // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist. |
319 | int locktree::acquire_lock_consolidated(void *prepared_lkr, |
320 | TXNID txnid, |
321 | const DBT *left_key, const DBT *right_key, |
322 | txnid_set *conflicts) { |
323 | int r = 0; |
324 | concurrent_tree::locked_keyrange *lkr; |
325 | |
326 | keyrange requested_range; |
327 | requested_range.create(left_key, right_key); |
328 | lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr); |
329 | lkr->acquire(requested_range); |
330 | |
331 | // copy out the set of overlapping row locks. |
332 | GrowableArray<row_lock> overlapping_row_locks; |
333 | overlapping_row_locks.init(); |
334 | iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks); |
335 | size_t num_overlapping_row_locks = overlapping_row_locks.get_size(); |
336 | |
337 | // if any overlapping row locks conflict with this request, bail out. |
338 | bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks, |
339 | txnid, conflicts); |
340 | if (!conflicts_exist) { |
341 | // there are no conflicts, so all of the overlaps are for the requesting txnid. |
342 | // so, we must consolidate all existing overlapping ranges and the requested |
343 | // range into one dominating range. then we insert the dominating range. |
344 | for (size_t i = 0; i < num_overlapping_row_locks; i++) { |
345 | row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i); |
346 | invariant(overlapping_lock.txnid == txnid); |
347 | requested_range.extend(m_cmp, overlapping_lock.range); |
348 | remove_row_lock_from_tree(lkr, overlapping_lock, m_mgr); |
349 | } |
350 | |
351 | row_lock new_lock = { .range = requested_range, .txnid = txnid }; |
352 | insert_row_lock_into_tree(lkr, new_lock, m_mgr); |
353 | } else { |
354 | r = DB_LOCK_NOTGRANTED; |
355 | } |
356 | |
357 | requested_range.destroy(); |
358 | overlapping_row_locks.deinit(); |
359 | return r; |
360 | } |
361 | |
362 | // acquire a lock in the given key range, inclusive. if successful, |
363 | // return 0. otherwise, populate the conflicts txnid_set with the set of |
364 | // transactions that conflict with this request. |
365 | int locktree::acquire_lock(bool is_write_request, |
366 | TXNID txnid, |
367 | const DBT *left_key, const DBT *right_key, |
368 | txnid_set *conflicts) { |
369 | int r = 0; |
370 | |
371 | // we are only supporting write locks for simplicity |
372 | invariant(is_write_request); |
373 | |
374 | // acquire and prepare a locked keyrange over the requested range. |
375 | // prepare is a serialzation point, so we take the opportunity to |
376 | // try the single txnid optimization first. |
377 | concurrent_tree::locked_keyrange lkr; |
378 | lkr.prepare(m_rangetree); |
379 | |
380 | bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key); |
381 | if (!acquired) { |
382 | r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key, conflicts); |
383 | } |
384 | |
385 | lkr.release(); |
386 | return r; |
387 | } |
388 | |
389 | int locktree::try_acquire_lock(bool is_write_request, |
390 | TXNID txnid, |
391 | const DBT *left_key, const DBT *right_key, |
392 | txnid_set *conflicts, bool big_txn) { |
393 | // All ranges in the locktree must have left endpoints <= right endpoints. |
394 | // Range comparisons rely on this fact, so we make a paranoid invariant here. |
395 | paranoid_invariant(m_cmp(left_key, right_key) <= 0); |
396 | int r = m_mgr == nullptr ? 0 : |
397 | m_mgr->check_current_lock_constraints(big_txn); |
398 | if (r == 0) { |
399 | r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts); |
400 | } |
401 | return r; |
402 | } |
403 | |
404 | // the locktree silently upgrades read locks to write locks for simplicity |
405 | int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, |
406 | txnid_set *conflicts, bool big_txn) { |
407 | return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn); |
408 | } |
409 | |
410 | int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key, |
411 | txnid_set *conflicts, bool big_txn) { |
412 | return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn); |
413 | } |
414 | |
415 | void locktree::get_conflicts(bool is_write_request, |
416 | TXNID txnid, const DBT *left_key, const DBT *right_key, |
417 | txnid_set *conflicts) { |
418 | // because we only support write locks, ignore this bit for now. |
419 | (void) is_write_request; |
420 | |
421 | // preparing and acquire a locked keyrange over the range |
422 | keyrange range; |
423 | range.create(left_key, right_key); |
424 | concurrent_tree::locked_keyrange lkr; |
425 | lkr.prepare(m_rangetree); |
426 | lkr.acquire(range); |
427 | |
428 | // copy out the set of overlapping row locks and determine the conflicts |
429 | GrowableArray<row_lock> overlapping_row_locks; |
430 | overlapping_row_locks.init(); |
431 | iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks); |
432 | |
433 | // we don't care if conflicts exist. we just want the conflicts set populated. |
434 | (void) determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts); |
435 | |
436 | lkr.release(); |
437 | overlapping_row_locks.deinit(); |
438 | range.destroy(); |
439 | } |
440 | |
441 | // Effect: |
442 | // For each range in the lock tree that overlaps the given range and has |
443 | // the given txnid, remove it. |
444 | // Rationale: |
445 | // In the common case, there is only the range [left_key, right_key] and |
446 | // it is associated with txnid, so this is a single tree delete. |
447 | // |
448 | // However, consolidation and escalation change the objects in the tree |
449 | // without telling the txn anything. In this case, the txn may own a |
450 | // large range lock that represents its ownership of many smaller range |
451 | // locks. For example, the txn may think it owns point locks on keys 1, |
452 | // 2, and 3, but due to escalation, only the object [1,3] exists in the |
453 | // tree. |
454 | // |
455 | // The first call for a small lock will remove the large range lock, and |
456 | // the rest of the calls should do nothing. After the first release, |
457 | // another thread can acquire one of the locks that the txn thinks it |
458 | // still owns. That's ok, because the txn doesn't want it anymore (it |
459 | // unlocks everything at once), but it may find a lock that it does not |
460 | // own. |
461 | // |
462 | // In our example, the txn unlocks key 1, which actually removes the |
463 | // whole lock [1,3]. Now, someone else can lock 2 before our txn gets |
464 | // around to unlocking 2, so we should not remove that lock. |
465 | void locktree::remove_overlapping_locks_for_txnid(TXNID txnid, |
466 | const DBT *left_key, |
467 | const DBT *right_key) { |
468 | keyrange release_range; |
469 | release_range.create(left_key, right_key); |
470 | |
471 | // acquire and prepare a locked keyrange over the release range |
472 | concurrent_tree::locked_keyrange lkr; |
473 | lkr.prepare(m_rangetree); |
474 | lkr.acquire(release_range); |
475 | |
476 | // copy out the set of overlapping row locks. |
477 | GrowableArray<row_lock> overlapping_row_locks; |
478 | overlapping_row_locks.init(); |
479 | iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks); |
480 | size_t num_overlapping_row_locks = overlapping_row_locks.get_size(); |
481 | |
482 | for (size_t i = 0; i < num_overlapping_row_locks; i++) { |
483 | row_lock lock = overlapping_row_locks.fetch_unchecked(i); |
484 | // If this isn't our lock, that's ok, just don't remove it. |
485 | // See rationale above. |
486 | if (lock.txnid == txnid) { |
487 | remove_row_lock_from_tree(&lkr, lock, m_mgr); |
488 | } |
489 | } |
490 | |
491 | lkr.release(); |
492 | overlapping_row_locks.deinit(); |
493 | release_range.destroy(); |
494 | } |
495 | |
496 | bool locktree::sto_txnid_is_valid_unsafe(void) const { |
497 | return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE; |
498 | } |
499 | |
500 | int locktree::sto_get_score_unsafe(void) const { |
501 | return toku_unsafe_fetch(m_sto_score); |
502 | } |
503 | |
504 | bool locktree::sto_try_release(TXNID txnid) { |
505 | bool released = false; |
506 | if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) { |
507 | // check the bit again with a prepared locked keyrange, |
508 | // which protects the optimization bits and rangetree data |
509 | concurrent_tree::locked_keyrange lkr; |
510 | lkr.prepare(m_rangetree); |
511 | if (m_sto_txnid != TXNID_NONE) { |
512 | // this txnid better be the single txnid on this locktree, |
513 | // or else we are in big trouble (meaning the logic is broken) |
514 | invariant(m_sto_txnid == txnid); |
515 | invariant(m_rangetree->is_empty()); |
516 | sto_end(); |
517 | released = true; |
518 | } |
519 | lkr.release(); |
520 | } |
521 | return released; |
522 | } |
523 | |
524 | // release all of the locks for a txnid whose endpoints are pairs |
525 | // in the given range buffer. |
526 | void locktree::release_locks(TXNID txnid, const range_buffer *ranges) { |
527 | // try the single txn optimization. if it worked, then all of the |
528 | // locks are already released, otherwise we need to do it here. |
529 | bool released = sto_try_release(txnid); |
530 | if (!released) { |
531 | range_buffer::iterator iter(ranges); |
532 | range_buffer::iterator::record rec; |
533 | while (iter.current(&rec)) { |
534 | const DBT *left_key = rec.get_left_key(); |
535 | const DBT *right_key = rec.get_right_key(); |
536 | // All ranges in the locktree must have left endpoints <= right endpoints. |
537 | // Range comparisons rely on this fact, so we make a paranoid invariant here. |
538 | paranoid_invariant(m_cmp(left_key, right_key) <= 0); |
539 | remove_overlapping_locks_for_txnid(txnid, left_key, right_key); |
540 | iter.next(); |
541 | } |
542 | // Increase the sto score slightly. Eventually it will hit |
543 | // the threshold and we'll try the optimization again. This |
544 | // is how a previously multithreaded system transitions into |
545 | // a single threaded system that benefits from the optimization. |
546 | if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) { |
547 | toku_sync_fetch_and_add(&m_sto_score, 1); |
548 | } |
549 | } |
550 | } |
551 | |
552 | // iterate over a locked keyrange and extract copies of the first N |
553 | // row locks, storing each one into the given array of size N, |
554 | // then removing each extracted lock from the locked keyrange. |
555 | static int (concurrent_tree::locked_keyrange *lkr, |
556 | locktree_manager *mgr, |
557 | row_lock *row_locks, int ) { |
558 | |
559 | struct { |
560 | int ; |
561 | int ; |
562 | row_lock *row_locks; |
563 | bool fn(const keyrange &range, TXNID txnid) { |
564 | if (num_extracted < num_to_extract) { |
565 | row_lock lock; |
566 | lock.range.create_copy(range); |
567 | lock.txnid = txnid; |
568 | row_locks[num_extracted++] = lock; |
569 | return true; |
570 | } else { |
571 | return false; |
572 | } |
573 | } |
574 | } ; |
575 | |
576 | extract_fn.row_locks = row_locks; |
577 | extract_fn.num_to_extract = num_to_extract; |
578 | extract_fn.num_extracted = 0; |
579 | lkr->iterate(&extract_fn); |
580 | |
581 | // now that the ranges have been copied out, complete |
582 | // the extraction by removing the ranges from the tree. |
583 | // use remove_row_lock_from_tree() so we properly track the |
584 | // amount of memory and number of locks freed. |
585 | int = extract_fn.num_extracted; |
586 | invariant(num_extracted <= num_to_extract); |
587 | for (int i = 0; i < num_extracted; i++) { |
588 | remove_row_lock_from_tree(lkr, row_locks[i], mgr); |
589 | } |
590 | |
591 | return num_extracted; |
592 | } |
593 | |
594 | // Store each newly escalated lock in a range buffer for appropriate txnid. |
595 | // We'll rebuild the locktree by iterating over these ranges, and then we |
596 | // can pass back each txnid/buffer pair individually through a callback |
597 | // to notify higher layers that locks have changed. |
598 | struct txnid_range_buffer { |
599 | TXNID txnid; |
600 | range_buffer buffer; |
601 | |
602 | static int find_by_txnid(struct txnid_range_buffer *const &other_buffer, const TXNID &txnid) { |
603 | if (txnid < other_buffer->txnid) { |
604 | return -1; |
605 | } else if (other_buffer->txnid == txnid) { |
606 | return 0; |
607 | } else { |
608 | return 1; |
609 | } |
610 | } |
611 | }; |
612 | |
613 | // escalate the locks in the locktree by merging adjacent |
614 | // locks that have the same txnid into one larger lock. |
615 | // |
616 | // if there's only one txnid in the locktree then this |
617 | // approach works well. if there are many txnids and each |
618 | // has locks in a random/alternating order, then this does |
619 | // not work so well. |
620 | void locktree::escalate(lt_escalate_cb after_escalate_callback, void *) { |
621 | omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers; |
622 | range_buffers.create(); |
623 | |
624 | // prepare and acquire a locked keyrange on the entire locktree |
625 | concurrent_tree::locked_keyrange lkr; |
626 | keyrange infinite_range = keyrange::get_infinite_range(); |
627 | lkr.prepare(m_rangetree); |
628 | lkr.acquire(infinite_range); |
629 | |
630 | // if we're in the single txnid optimization, simply call it off. |
631 | // if you have to run escalation, you probably don't care about |
632 | // the optimization anyway, and this makes things easier. |
633 | if (m_sto_txnid != TXNID_NONE) { |
634 | // We are already accounting for this escalation time and |
635 | // count, so don't do it for sto_end_early too. |
636 | sto_end_early_no_accounting(&lkr); |
637 | } |
638 | |
639 | // extract and remove batches of row locks from the locktree |
640 | int ; |
641 | const int num_row_locks_per_batch = 128; |
642 | row_lock *XCALLOC_N(num_row_locks_per_batch, ); |
643 | |
644 | // we always remove the "first" n because we are removing n |
645 | // each time we do an extraction. so this loops until its empty. |
646 | while ((num_extracted = |
647 | extract_first_n_row_locks(&lkr, m_mgr, extracted_buf, |
648 | num_row_locks_per_batch)) > 0) { |
649 | int current_index = 0; |
650 | while (current_index < num_extracted) { |
651 | // every batch of extracted locks is in range-sorted order. search |
652 | // through them and merge adjacent locks with the same txnid into |
653 | // one dominating lock and save it to a set of escalated locks. |
654 | // |
655 | // first, find the index of the next row lock with a different txnid |
656 | int next_txnid_index = current_index + 1; |
657 | while (next_txnid_index < num_extracted && |
658 | extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) { |
659 | next_txnid_index++; |
660 | } |
661 | |
662 | // Create an escalated range for the current txnid that dominates |
663 | // each range between the current indext and the next txnid's index. |
664 | const TXNID current_txnid = extracted_buf[current_index].txnid; |
665 | const DBT *escalated_left_key = extracted_buf[current_index].range.get_left_key(); |
666 | const DBT *escalated_right_key = extracted_buf[next_txnid_index - 1].range.get_right_key(); |
667 | |
668 | // Try to find a range buffer for the current txnid. Create one if it doesn't exist. |
669 | // Then, append the new escalated range to the buffer. |
670 | uint32_t idx; |
671 | struct txnid_range_buffer *existing_range_buffer; |
672 | int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>( |
673 | current_txnid, |
674 | &existing_range_buffer, |
675 | &idx |
676 | ); |
677 | if (r == DB_NOTFOUND) { |
678 | struct txnid_range_buffer *XMALLOC(new_range_buffer); |
679 | new_range_buffer->txnid = current_txnid; |
680 | new_range_buffer->buffer.create(); |
681 | new_range_buffer->buffer.append(escalated_left_key, escalated_right_key); |
682 | range_buffers.insert_at(new_range_buffer, idx); |
683 | } else { |
684 | invariant_zero(r); |
685 | invariant(existing_range_buffer->txnid == current_txnid); |
686 | existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key); |
687 | } |
688 | |
689 | current_index = next_txnid_index; |
690 | } |
691 | |
692 | // destroy the ranges copied during the extraction |
693 | for (int i = 0; i < num_extracted; i++) { |
694 | extracted_buf[i].range.destroy(); |
695 | } |
696 | } |
697 | toku_free(extracted_buf); |
698 | |
699 | // Rebuild the locktree from each range in each range buffer, |
700 | // then notify higher layers that the txnid's locks have changed. |
701 | invariant(m_rangetree->is_empty()); |
702 | const size_t num_range_buffers = range_buffers.size(); |
703 | for (size_t i = 0; i < num_range_buffers; i++) { |
704 | struct txnid_range_buffer *current_range_buffer; |
705 | int r = range_buffers.fetch(i, ¤t_range_buffer); |
706 | invariant_zero(r); |
707 | |
708 | const TXNID current_txnid = current_range_buffer->txnid; |
709 | range_buffer::iterator iter(¤t_range_buffer->buffer); |
710 | range_buffer::iterator::record rec; |
711 | while (iter.current(&rec)) { |
712 | keyrange range; |
713 | range.create(rec.get_left_key(), rec.get_right_key()); |
714 | row_lock lock = { .range = range, .txnid = current_txnid }; |
715 | insert_row_lock_into_tree(&lkr, lock, m_mgr); |
716 | iter.next(); |
717 | } |
718 | |
719 | // Notify higher layers that locks have changed for the current txnid |
720 | if (after_escalate_callback) { |
721 | after_escalate_callback(current_txnid, this, current_range_buffer->buffer, after_escalate_callback_extra); |
722 | } |
723 | current_range_buffer->buffer.destroy(); |
724 | } |
725 | |
726 | while (range_buffers.size() > 0) { |
727 | struct txnid_range_buffer *buffer; |
728 | int r = range_buffers.fetch(0, &buffer); |
729 | invariant_zero(r); |
730 | r = range_buffers.delete_at(0); |
731 | invariant_zero(r); |
732 | toku_free(buffer); |
733 | } |
734 | range_buffers.destroy(); |
735 | |
736 | lkr.release(); |
737 | } |
738 | |
739 | void *locktree::get_userdata(void) const { |
740 | return m_userdata; |
741 | } |
742 | |
743 | void locktree::set_userdata(void *userdata) { |
744 | m_userdata = userdata; |
745 | } |
746 | |
747 | struct lt_lock_request_info *locktree::get_lock_request_info(void) { |
748 | return &m_lock_request_info; |
749 | } |
750 | |
751 | void locktree::set_comparator(const comparator &cmp) { |
752 | m_cmp.inherit(cmp); |
753 | } |
754 | |
755 | locktree_manager *locktree::get_manager(void) const { |
756 | return m_mgr; |
757 | } |
758 | |
759 | int locktree::compare(const locktree *lt) const { |
760 | if (m_dict_id.dictid < lt->m_dict_id.dictid) { |
761 | return -1; |
762 | } else if (m_dict_id.dictid == lt->m_dict_id.dictid) { |
763 | return 0; |
764 | } else { |
765 | return 1; |
766 | } |
767 | } |
768 | |
769 | DICTIONARY_ID locktree::get_dict_id() const { |
770 | return m_dict_id; |
771 | } |
772 | |
773 | } /* namespace toku */ |
774 | |